This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7df1551 Use the maxWorkItemCommitBytes value returned in the
StreamingConfigTask, if there is one.
new 5a1dc26 Merge pull request #8033 from drieber/maxWorkItemCommitBytes
7df1551 is described below
commit 7df1551778a58a76f345eca39eb5c70ec4848912
Author: David Rieber <[email protected]>
AuthorDate: Mon Mar 11 15:49:21 2019 -0700
Use the maxWorkItemCommitBytes value returned in the StreamingConfigTask,
if there is one.
---
.../runners/dataflow/worker/StreamingDataflowWorker.java | 14 +++++++++++++-
1 file changed, 13 insertions(+), 1 deletion(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 38d8349..cea383d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -695,6 +695,7 @@ public class StreamingDataflowWorker {
LOG.debug("WindmillServiceEndpoint: {}",
options.getWindmillServiceEndpoint());
LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
+ LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes);
}
private Node createPortNode(String predecessorId, String successorId) {
@@ -726,6 +727,9 @@ public class StreamingDataflowWorker {
@VisibleForTesting
public void setMaxWorkItemCommitBytes(int maxWorkItemCommitBytes) {
+ if (maxWorkItemCommitBytes != this.maxWorkItemCommitBytes) {
+ LOG.info("Setting maxWorkItemCommitBytes to {}", maxWorkItemCommitBytes);
+ }
this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
}
@@ -1585,12 +1589,20 @@ public class StreamingDataflowWorker {
if (workItem == null || !workItem.isPresent() || workItem.get() == null) {
return;
}
- setMaxWorkItemCommitBytes(180 << 20);
StreamingConfigTask config = workItem.get().getStreamingConfigTask();
Preconditions.checkState(config != null);
if (config.getUserStepToStateFamilyNameMap() != null) {
stateNameMap.putAll(config.getUserStepToStateFamilyNameMap());
}
+ if (computation == null) {
+ if (config.getMaxWorkItemCommitBytes() != null
+ && config.getMaxWorkItemCommitBytes() > 0
+ && config.getMaxWorkItemCommitBytes() <= Integer.MAX_VALUE) {
+
setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue());
+ } else {
+ setMaxWorkItemCommitBytes(180 << 20);
+ }
+ }
List<StreamingComputationConfig> configs =
config.getStreamingComputationConfigs();
if (configs != null) {
for (StreamingComputationConfig computationConfig : configs) {