jia-gao commented on code in PR #26314:
URL: https://github.com/apache/beam/pull/26314#discussion_r1170561739


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java:
##########
@@ -131,6 +117,30 @@ public Config build() {
     }
   }
 
+  @VisibleForTesting
+  static Map<String, String> createBundleConfig(
+      SamzaPipelineOptions options, Map<String, String> config) {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    builder.put(MAX_CONCURRENCY, String.valueOf(options.getMaxBundleSize()));
+
+    if (options.getMaxBundleSize() > 1) {
+      final int threadPoolSize = 
ConfigUtils.asJobConfig(config).getThreadPoolSize();
+      LOG.info("Remove threadPoolSize configs when maxBundleSize > 1");
+      builder.put(JOB_CONTAINER_THREAD_POOL_SIZE, "0");
+      builder.put(JOB_AUTOSIZING_CONTAINER_THREAD_POOL_SIZE, "0");
+
+      if (threadPoolSize > 1 && options.getNumThreadsForProcessElement() <= 1) 
{
+        // In case the user sets the thread pool through samza config instead 
options,
+        // set the bundle thread pool size based on container thread pool 
config
+        // this allows Samza auto-sizing to tune the threads
+        LOG.info("Convert threadPoolSize {} to numThreadsForProcessElement", 
threadPoolSize);
+        // NumThreadsForProcessElement in option is the source of truth
+        options.setNumThreadsForProcessElement(threadPoolSize);

Review Comment:
   To clarify, ` job.container.thread.pool.size` won't work if auto-sizing is 
enabled.
   auto sizing today allows user overrides size configs but still leave auto 
sizing enabled (asc can still resize the job)  by:
   1. users can configure (though not recommended) 
`job.autosizing.container.thread.pool.size`. It will be honored when the job 
starts
   2. when auto-sizing resizes the job, it will submit the job to jetty proxy 
with a new value of 
       `job.autosizing.container.thread.pool.size` included in the submission 
config. and write it to the coordinator 
      stream
   3. If user redeploys the job again, the size in 1 is still honored. If the 
user removes the config in 1 and redeploy, 
      the  size in 2 will be honored (reading from coordinator stream)
   
   for beam job options like ThreadsForProcessElement, asc is not able to 
achieve 2 so I agree that in ConfigBuilder we should honor the 
`job.autosizing.container.thread.pool.size` over options specified in code to 
be able to resize the job. Afterall, if 
`job.autosizing.container.thread.pool.size` is present, if means either asc 
resizes the job or the job owner wants to override the size but still want to 
leave auto-sizing on
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to