rmatharu commented on a change in pull request #1402:
URL: https://github.com/apache/samza/pull/1402#discussion_r458452574
##########
File path: samza-core/src/main/java/org/apache/samza/util/Util.java
##########
@@ -123,4 +131,85 @@ private static InetAddress doGetLocalHost() throws
UnknownHostException, SocketE
}
return localHost;
}
+
+ public static String getDeploymentType(Config config) {
+ JobConfig jobConfig = new JobConfig(config);
+ Optional<String> streamJobFactoryClass =
jobConfig.getStreamJobFactoryClass();
+ if (streamJobFactoryClass.isPresent()) {
+ if (streamJobFactoryClass.get().equals(YARN_JOB_FACTORY_CLASS)) {
+ return DeploymentType.YARN.name();
+ } else {
+ return DeploymentType.STANDALONE.name();
+ }
+ }
+ return "NOT_DEFINED";
+ }
+
+ public static String getApiType(Config config) {
+ ApplicationConfig appConfig = new ApplicationConfig(config);
+ String appClass = appConfig.getAppClass();
+ if (appClass == null || appClass.isEmpty()) {
+ return ApiType.SAMZA_LOW_LEVEL.name();
+ }
+ if (appClass.equals(BEAM_RUNNER_CLASS)) {
+ return ApiType.SAMZA_BEAM.name();
+ }
+ if (appClass.equals(SQL_RUNNER_CLASS)) {
+ return ApiType.SAMZA_SQL.name();
+ }
+ if (appClass.getClass().isInstance(StreamApplication.class)) {
+ return ApiType.SAMZA_HIGH_LEVEL.name();
+ }
+ return ApiType.SAMZA_LOW_LEVEL.name();
+ }
+
+ public static int getContainerCount(Config config) {
+ JobConfig jobConfig = new JobConfig(config);
+ return jobConfig.getContainerCount();
+ }
+
+ public static int getContainerMemoryMb(Config config) {
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
+ return clusterManagerConfig.getContainerMemoryMb();
+ }
+
+ public static int getNumCores(Config config) {
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
+ return clusterManagerConfig.getNumCores();
+ }
+
+ public static int getThreadPoolSize(Config config) {
+ JobConfig jobConfig = new JobConfig(config);
+ return jobConfig.getThreadPoolSize();
+ }
+
+ public static String getSspGrouperFactory(Config config) {
+ JobConfig jobConfig = new JobConfig(config);
+ return jobConfig.getSystemStreamPartitionGrouperFactory();
+ }
+
+ public static boolean getHostAffinityEnabled(Config config) {
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
+ return clusterManagerConfig.getHostAffinityEnabled();
+ }
+
+ public static int getContainerRetryCount(Config config) {
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
+ return clusterManagerConfig.getContainerRetryCount();
+ }
+
+ public static int getContainerRetryWindowMs(Config config) {
+ ClusterManagerConfig clusterManagerConfig = new
ClusterManagerConfig(config);
+ return clusterManagerConfig.getContainerRetryWindowMs();
+ }
+
+ public static int getMaxConcurrency(Config config) {
+ TaskConfig taskConfig = new TaskConfig(config);
+ return taskConfig.getMaxConcurrency();
+ }
+
+ public static int getMaxJvmHeapMb() {
+ Long maxJvmHeapMb = Runtime.getRuntime().maxMemory() / (1024 * 1024);
+ return maxJvmHeapMb.intValue();
+ }
Review comment:
Can DiagnosticsUtil.buildDiagnosticsManager also invoke this util
method, instead of calling Runtime.getRuntime().maxMemory directly ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]