mynameborat commented on code in PR #26314:
URL: https://github.com/apache/beam/pull/26314#discussion_r1170412724


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java:
##########
@@ -131,6 +117,30 @@ public Config build() {
     }
   }
 
+  @VisibleForTesting
+  static Map<String, String> createBundleConfig(
+      SamzaPipelineOptions options, Map<String, String> config) {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    builder.put(MAX_CONCURRENCY, String.valueOf(options.getMaxBundleSize()));
+
+    if (options.getMaxBundleSize() > 1) {
+      final int threadPoolSize = 
ConfigUtils.asJobConfig(config).getThreadPoolSize();
+      LOG.info("Remove threadPoolSize configs when maxBundleSize > 1");
+      builder.put(JOB_CONTAINER_THREAD_POOL_SIZE, "0");
+      builder.put(JOB_AUTOSIZING_CONTAINER_THREAD_POOL_SIZE, "0");
+
+      if (threadPoolSize > 1 && options.getNumThreadsForProcessElement() <= 1) 
{
+        // In case the user sets the thread pool through samza config instead 
options,
+        // set the bundle thread pool size based on container thread pool 
config
+        // this allows Samza auto-sizing to tune the threads
+        LOG.info("Convert threadPoolSize {} to numThreadsForProcessElement", 
threadPoolSize);
+        // NumThreadsForProcessElement in option is the source of truth
+        options.setNumThreadsForProcessElement(threadPoolSize);

Review Comment:
   Shouldn't autosizing take precedence over user configuration? i.e., 
   We'd need to also set the threadPoolSize regardless of user's 
`getNumThreadsForProcessElement()` if autosizing is enabled.



##########
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java:
##########
@@ -131,6 +117,30 @@ public Config build() {
     }
   }
 
+  @VisibleForTesting
+  static Map<String, String> createBundleConfig(
+      SamzaPipelineOptions options, Map<String, String> config) {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    builder.put(MAX_CONCURRENCY, String.valueOf(options.getMaxBundleSize()));
+
+    if (options.getMaxBundleSize() > 1) {
+      final int threadPoolSize = 
ConfigUtils.asJobConfig(config).getThreadPoolSize();
+      LOG.info("Remove threadPoolSize configs when maxBundleSize > 1");
+      builder.put(JOB_CONTAINER_THREAD_POOL_SIZE, "0");
+      builder.put(JOB_AUTOSIZING_CONTAINER_THREAD_POOL_SIZE, "0");

Review Comment:
   Can we add a note on why we reset the value that is set by auto-sizer?



##########
runners/samza/src/test/java/org/apache/beam/runners/samza/translation/ConfigGeneratorTest.java:
##########
@@ -414,4 +417,39 @@ public void processElement(
         "TestStoreConfig-1-testState-Same_stateful_ParDo_Name2-changelog",
         config2.get("stores.testState-Same_stateful_ParDo_Name2.changelog"));
   }
+
+  @Test
+  public void testCreateBundleConfig() {

Review Comment:
   Reminder to add another test w/ autosizing enabled vs disabled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to