[
https://issues.apache.org/jira/browse/BEAM-6253?focusedWorklogId=176658&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-176658
]
ASF GitHub Bot logged work on BEAM-6253:
----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Dec/18 18:28
Start Date: 18/Dec/18 18:28
Worklog Time Spent: 10m
Work Description: xinyuiscool closed pull request #7302: [BEAM-6253]
SamzaRunner: Add a few customized transforms for runner use cases
URL: https://github.com/apache/beam/pull/7302
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
index 58739f78380c..c2a8d0a6e7ed 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
@@ -77,7 +77,7 @@ public SamzaPipelineResult run(Pipeline pipeline) {
final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
final ConfigBuilder configBuilder = new ConfigBuilder(options);
- SamzaPipelineTranslator.createConfig(pipeline, idMap, configBuilder);
+ SamzaPipelineTranslator.createConfig(pipeline, options, idMap,
configBuilder);
final ApplicationRunner runner =
ApplicationRunner.fromConfig(configBuilder.build());
final SamzaExecutionContext executionContext = new SamzaExecutionContext();
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index be299ad64d82..c6fab218536f 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -42,6 +42,7 @@
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.samza.state.SamzaMapState;
import org.apache.beam.runners.samza.state.SamzaSetState;
+import org.apache.beam.runners.samza.transforms.UpdatingCombineFn;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
@@ -805,7 +806,14 @@ public AccumT mergeAccumulators(Iterable<AccumT>
accumulators) {
@Override
@Nonnull
public OutT read() {
- return combineFn.extractOutput(getAccum());
+ AccumT accum = getAccum();
+ OutT output = combineFn.extractOutput(accum);
+ if (combineFn instanceof UpdatingCombineFn) {
+ AccumT updatedAccum =
+ ((UpdatingCombineFn<InT, AccumT, OutT>)
combineFn).updateAfterFiring(accum);
+ writeInternal(updatedAccum);
+ }
+ return output;
}
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/transforms/GroupWithoutRepartition.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/transforms/GroupWithoutRepartition.java
new file mode 100644
index 000000000000..c0ce52e34c09
--- /dev/null
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/transforms/GroupWithoutRepartition.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.transforms;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/**
+ * A wrapper transform of {@link org.apache.beam.sdk.transforms.GroupByKey} or
{@link
+ * org.apache.beam.sdk.transforms.join.CoGroupByKey} to indicate there is no
repartition needed for
+ * Samza runner. For example:
+ *
+ * <p>input.apply(GroupWithoutRepartition.of(Count.perKey()));
+ */
+public class GroupWithoutRepartition<InputT extends PInput, OutputT extends
POutput>
+ extends PTransform<InputT, OutputT> {
+ private final PTransform<InputT, OutputT> transform;
+
+ public static <InputT extends PInput, OutputT extends POutput>
+ GroupWithoutRepartition<InputT, OutputT> of(PTransform<InputT, OutputT>
transform) {
+ return new GroupWithoutRepartition<>(transform);
+ }
+
+ private GroupWithoutRepartition(PTransform<InputT, OutputT> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public OutputT expand(InputT input) {
+ if (input instanceof PCollection) {
+ return (OutputT) ((PCollection) input).apply(transform);
+ } else if (input instanceof KeyedPCollectionTuple) {
+ return (OutputT) ((KeyedPCollectionTuple) input).apply(transform);
+ } else {
+ throw new RuntimeException(
+ transform.getName()
+ + " is not supported with "
+ + GroupWithoutRepartition.class.getSimpleName());
+ }
+ }
+}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/transforms/UpdatingCombineFn.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/transforms/UpdatingCombineFn.java
new file mode 100644
index 000000000000..f0a7e5e1ceaa
--- /dev/null
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/transforms/UpdatingCombineFn.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.transforms;
+
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Currently Beam only supports either throw away the accumulation or keep it
after firing. This
+ * CombineFn allows more flexibility to update the accumulation.
+ */
+public abstract class UpdatingCombineFn<InputT, AccumT, OutputT>
+ extends Combine.CombineFn<InputT, AccumT, OutputT> {
+
+ /**
+ * Returns an updated accumulator from the given accumulator after firing a
window pane.
+ *
+ * <p>For efficiency, the input accumulator may be modified and returned.
+ */
+ public abstract AccumT updateAfterFiring(AccumT accumulator);
+}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/transforms/package-info.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/transforms/package-info.java
new file mode 100644
index 000000000000..292e7563081a
--- /dev/null
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/transforms/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Internal implementation of the Beam runner for Apache Samza. */
+package org.apache.beam.runners.samza.transforms;
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
index 00f5233d7657..5761bf847d27 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigContext.java
@@ -19,6 +19,7 @@
import com.google.common.collect.Iterables;
import java.util.Map;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
@@ -28,9 +29,11 @@
public class ConfigContext {
private final Map<PValue, String> idMap;
private AppliedPTransform<?, ?, ?> currentTransform;
+ private final SamzaPipelineOptions options;
- public ConfigContext(Map<PValue, String> idMap) {
+ public ConfigContext(Map<PValue, String> idMap, SamzaPipelineOptions
options) {
this.idMap = idMap;
+ this.options = options;
}
public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform)
{
@@ -50,6 +53,10 @@ public String getOutputId(TransformHierarchy.Node node) {
return
getIdForPValue(Iterables.getOnlyElement(node.getOutputs().values()));
}
+ public SamzaPipelineOptions getPipelineOptions() {
+ return this.options;
+ }
+
private String getIdForPValue(PValue pvalue) {
final String id = idMap.get(pvalue);
if (id == null) {
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
index 4b86d47a4428..fce3207ca772 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
@@ -25,6 +25,7 @@
import org.apache.beam.runners.samza.runtime.KvToKeyedWorkItemOp;
import org.apache.beam.runners.samza.runtime.OpAdapter;
import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.beam.runners.samza.transforms.GroupWithoutRepartition;
import org.apache.beam.runners.samza.util.SamzaCoders;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
@@ -73,8 +74,7 @@ public void translate(
inputStream.filter(msg -> msg.getType() == OpMessage.Type.ELEMENT);
final MessageStream<OpMessage<KV<K, InputT>>> partitionedInputStream;
- if (ctx.getPipelineOptions().getMaxSourceParallelism() == 1) {
- // Only one task will be created, no need for repartition
+ if (!needRepartition(node, ctx)) {
partitionedInputStream = filteredInputStream;
} else {
partitionedInputStream =
@@ -134,4 +134,22 @@ public void translate(
throw new RuntimeException("Transform " + transform + " cannot be
translated as GroupByKey.");
}
}
+
+ private boolean needRepartition(TransformHierarchy.Node node,
TranslationContext ctx) {
+
+ if (ctx.getPipelineOptions().getMaxSourceParallelism() == 1) {
+ // Only one task will be created, no need for repartition
+ return false;
+ }
+
+ if (node == null) {
+ return true;
+ }
+
+ if (node.getTransform() instanceof GroupWithoutRepartition) {
+ return false;
+ } else {
+ return needRepartition(node.getEnclosingNode(), ctx);
+ }
+ }
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
index f498d641784d..f97fc0dac6a8 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java
@@ -84,8 +84,11 @@ public static void translate(
}
public static void createConfig(
- Pipeline pipeline, Map<PValue, String> idMap, ConfigBuilder
configBuilder) {
- final ConfigContext ctx = new ConfigContext(idMap);
+ Pipeline pipeline,
+ SamzaPipelineOptions options,
+ Map<PValue, String> idMap,
+ ConfigBuilder configBuilder) {
+ final ConfigContext ctx = new ConfigContext(idMap, options);
final TransformVisitorFn configFn =
new TransformVisitorFn() {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 176658)
Time Spent: 1h (was: 50m)
> SamzaRunner: Add a few customized transforms for runner use cases
> -----------------------------------------------------------------
>
> Key: BEAM-6253
> URL: https://issues.apache.org/jira/browse/BEAM-6253
> Project: Beam
> Issue Type: Improvement
> Components: runner-samza
> Reporter: Xinyu Liu
> Assignee: Xinyu Liu
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> A few customized transforms are needed in SamzaRunner to support the
> following cases:
> - allow GroupByKey/CoGroupByKey without repartitioning
> - allow updating accumulation after firing a window pane
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)