rmatharu commented on a change in pull request #1011: SAMZA-2172: Make High
Level applications respect job.container.thread.pool.size
URL: https://github.com/apache/samza/pull/1011#discussion_r279104363
##########
File path: samza-core/src/main/java/org/apache/samza/task/RunLoop.java
##########
@@ -457,17 +457,25 @@ private void process() {
final IncomingMessageEnvelope envelope = state.fetchEnvelope();
log.trace("Process ssp {} offset {}",
envelope.getSystemStreamPartition(), envelope.getOffset());
- final ReadableCoordinator coordinator = new
ReadableCoordinator(task.taskName());
- TaskCallbackFactory callbackFactory = new TaskCallbackFactory() {
- @Override
- public TaskCallback createCallback() {
- state.startProcess();
- containerMetrics.processes().inc();
- return callbackManager.createCallback(task.taskName(), envelope,
coordinator);
- }
+ Runnable processWorker = () -> {
+ final ReadableCoordinator coordinator = new
ReadableCoordinator(task.taskName());
+ TaskCallbackFactory callbackFactory = new TaskCallbackFactory() {
+ @Override
+ public TaskCallback createCallback() {
+ state.startProcess();
+ containerMetrics.processes().inc();
+ return callbackManager.createCallback(task.taskName(), envelope,
coordinator);
+ }
+ };
+
+ task.process(envelope, coordinator, callbackFactory);
};
- task.process(envelope, coordinator, callbackFactory);
+ if (threadPool != null) {
Review comment:
This is a bad pattern in code. Can we fix atleast this instance of it.
This threadPool is being passed from SamzaContainer as a null.
```val taskThreadPool = if (threadPoolSize > 0) {
Executors.newFixedThreadPool(threadPoolSize,
new ThreadFactoryBuilder().setNameFormat("Samza Container
Thread-%d").build())
} else {
null
}```
a. We can either push the thread-pool creation down to the place its
actually being used.
b. Alternatively, can create it as a single-threaded executor pool.
c. or pass it as an optional.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services