This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch release-2.30.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.30.0 by this push:
new 0176b9d [BEAM-12273] Support non-multimap materialization in Twister2
runner
new fbd09de Merge pull request #14725 from kennknowles/twister2-cherrypick
0176b9d is described below
commit 0176b9d558ebff61ac189f100a8b3435a212d19a
Author: Kenneth Knowles <[email protected]>
AuthorDate: Mon May 3 14:10:23 2021 -0700
[BEAM-12273] Support non-multimap materialization in Twister2 runner
---
.../batch/PCollectionViewTranslatorBatch.java | 52 +++++++++++---
.../translators/functions/ByteToElemFunction.java | 79 ++++++++++++++++++++
.../translators/functions/ElemToBytesFunction.java | 84 ++++++++++++++++++++++
.../twister2/utils/Twister2SideInputReader.java | 76 +++++++++++++++-----
4 files changed, 262 insertions(+), 29 deletions(-)
diff --git
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
index a8c1771..9bc32fc 100644
---
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
+++
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
@@ -22,11 +22,14 @@ import java.io.IOException;
import
org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
import org.apache.beam.runners.twister2.Twister2BatchTranslationContext;
import org.apache.beam.runners.twister2.translators.BatchTransformTranslator;
+import
org.apache.beam.runners.twister2.translators.functions.ByteToElemFunction;
import
org.apache.beam.runners.twister2.translators.functions.ByteToWindowFunctionPrimitive;
+import
org.apache.beam.runners.twister2.translators.functions.ElemToBytesFunction;
import
org.apache.beam.runners.twister2.translators.functions.MapToTupleFunction;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -58,23 +61,50 @@ public class PCollectionViewTranslatorBatch<ElemT, ViewT>
context.getCurrentTransform();
org.apache.beam.sdk.values.PCollectionView<ViewT> input;
PCollection<ElemT> inputPCol = context.getInput(transform);
- final KvCoder coder = (KvCoder) inputPCol.getCoder();
- Coder inputKeyCoder = coder.getKeyCoder();
+ final Coder coder = inputPCol.getCoder();
WindowingStrategy windowingStrategy = inputPCol.getWindowingStrategy();
WindowFn windowFn = windowingStrategy.getWindowFn();
- final WindowedValue.WindowedValueCoder wvCoder =
- WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(),
windowFn.windowCoder());
- BatchTSet<WindowedValue<ElemT>> inputGathered =
- inputDataSet
- .direct()
- .map(new MapToTupleFunction<>(inputKeyCoder, wvCoder))
- .allGather()
- .map(new ByteToWindowFunctionPrimitive(inputKeyCoder, wvCoder));
try {
input = CreatePCollectionViewTranslation.getView(application);
} catch (IOException e) {
throw new RuntimeException(e);
}
- context.setSideInputDataSet(input.getTagInternal().getId(), inputGathered);
+
+ switch (input.getViewFn().getMaterialization().getUrn()) {
+ case Materializations.MULTIMAP_MATERIALIZATION_URN:
+ KvCoder kvCoder = (KvCoder<?, ?>) coder;
+ final Coder keyCoder = kvCoder.getKeyCoder();
+ final WindowedValue.WindowedValueCoder kvwvCoder =
+ WindowedValue.FullWindowedValueCoder.of(
+ kvCoder.getValueCoder(), windowFn.windowCoder());
+ BatchTSet<WindowedValue<ElemT>> multimapMaterialization =
+ inputDataSet
+ .direct()
+ .map(new MapToTupleFunction<>(keyCoder, kvwvCoder))
+ .allGather()
+ .map(new ByteToWindowFunctionPrimitive(keyCoder, kvwvCoder));
+ context.setSideInputDataSet(input.getTagInternal().getId(),
multimapMaterialization);
+ break;
+ case Materializations.ITERABLE_MATERIALIZATION_URN:
+ final WindowedValue.WindowedValueCoder wvCoder =
+ WindowedValue.FullWindowedValueCoder.of(coder,
windowFn.windowCoder());
+ BatchTSet<WindowedValue<ElemT>> iterableMaterialization =
+ inputDataSet
+ .direct()
+ .map(new ElemToBytesFunction<>(wvCoder))
+ .allGather()
+ .map(new ByteToElemFunction(wvCoder));
+ try {
+ input = CreatePCollectionViewTranslation.getView(application);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ context.setSideInputDataSet(input.getTagInternal().getId(),
iterableMaterialization);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown side input materialization "
+ + input.getViewFn().getMaterialization().getUrn());
+ }
}
}
diff --git
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java
new file mode 100644
index 0000000..578225f
--- /dev/null
+++
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.MapFunc;
+import java.io.ObjectStreamException;
+import java.util.logging.Logger;
+import org.apache.beam.runners.twister2.utils.TranslationUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+
+/** ByteToWindow function. */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class ByteToElemFunction<V> implements MapFunc<WindowedValue<V>,
byte[]> {
+ private transient WindowedValueCoder<V> wvCoder;
+ private static final Logger LOG =
Logger.getLogger(ByteToElemFunction.class.getName());
+
+ private transient boolean isInitialized = false;
+ private byte[] wvCoderBytes;
+
+ public ByteToElemFunction() {
+ // non arg constructor needed for kryo
+ isInitialized = false;
+ }
+
+ public ByteToElemFunction(final WindowedValueCoder<V> wvCoder) {
+ this.wvCoder = wvCoder;
+
+ wvCoderBytes = SerializableUtils.serializeToByteArray(wvCoder);
+ }
+
+ @Override
+ public WindowedValue<V> map(byte[] input) {
+ return TranslationUtils.fromByteArray(input, wvCoder);
+ }
+
+ @Override
+ public void prepare(TSetContext context) {
+ initTransient();
+ }
+
+ /**
+ * Method used to initialize the transient variables that were sent over as
byte arrays or proto
+ * buffers.
+ */
+ private void initTransient() {
+ if (isInitialized) {
+ return;
+ }
+
+ wvCoder =
+ (WindowedValueCoder<V>)
+ SerializableUtils.deserializeFromByteArray(wvCoderBytes, "Custom
Coder Bytes");
+ this.isInitialized = true;
+ }
+
+ protected Object readResolve() throws ObjectStreamException {
+ return this;
+ }
+}
diff --git
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java
new file mode 100644
index 0000000..c83acdd
--- /dev/null
+++
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.MapFunc;
+import java.io.ObjectStreamException;
+import java.util.logging.Logger;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Map to tuple function. */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class ElemToBytesFunction<V> implements MapFunc<byte[],
WindowedValue<V>> {
+
+ private transient WindowedValue.WindowedValueCoder<V> wvCoder;
+ private static final Logger LOG =
Logger.getLogger(ElemToBytesFunction.class.getName());
+
+ private transient boolean isInitialized = false;
+ private byte[] wvCoderBytes;
+
+ public ElemToBytesFunction() {
+ // non arg constructor needed for kryo
+ this.isInitialized = false;
+ }
+
+ public ElemToBytesFunction(WindowedValue.WindowedValueCoder<V> wvCoder) {
+ this.wvCoder = wvCoder;
+ wvCoderBytes = SerializableUtils.serializeToByteArray(wvCoder);
+ }
+
+ @Override
+ public @Nullable byte[] map(WindowedValue<V> input) {
+ try {
+ return CoderUtils.encodeToByteArray(wvCoder, input);
+ } catch (CoderException e) {
+ LOG.info(e.getMessage());
+ }
+ return null;
+ }
+
+ @Override
+ public void prepare(TSetContext context) {
+ initTransient();
+ }
+
+ /**
+ * Method used to initialize the transient variables that were sent over as
byte arrays or proto
+ * buffers.
+ */
+ private void initTransient() {
+ if (isInitialized) {
+ return;
+ }
+ wvCoder =
+ (WindowedValue.WindowedValueCoder<V>)
+ SerializableUtils.deserializeFromByteArray(wvCoderBytes, "Coder");
+ this.isInitialized = true;
+ }
+
+ protected Object readResolve() throws ObjectStreamException {
+ return this;
+ }
+}
diff --git
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
index bbcd392..6ed77c7 100644
---
a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
+++
b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
@@ -23,6 +23,7 @@ import edu.iu.dsc.tws.api.dataset.DataPartition;
import edu.iu.dsc.tws.api.dataset.DataPartitionConsumer;
import edu.iu.dsc.tws.api.tset.TSetContext;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,11 +32,11 @@ import
org.apache.beam.runners.core.InMemoryMultimapSideInputView;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.Materializations.MultimapView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -75,40 +76,79 @@ public class Twister2SideInputReader implements
SideInputReader {
}
private <T> T getSideInput(PCollectionView<T> view, BoundedWindow window) {
- Map<BoundedWindow, List<WindowedValue<KV<?, ?>>>> partitionedElements =
new HashMap<>();
+ switch (view.getViewFn().getMaterialization().getUrn()) {
+ case Materializations.MULTIMAP_MATERIALIZATION_URN:
+ return getMultimapSideInput(view, window);
+ case Materializations.ITERABLE_MATERIALIZATION_URN:
+ return getIterableSideInput(view, window);
+ default:
+ throw new IllegalArgumentException(
+ "Unknown materialization type: " +
view.getViewFn().getMaterialization().getUrn());
+ }
+ }
+
+ private <T> T getMultimapSideInput(PCollectionView<T> view, BoundedWindow
window) {
+ Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements =
getPartitionedElements(view);
+ Map<BoundedWindow, T> resultMap = new HashMap<>();
+
+ ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>)
view.getViewFn();
+ for (Map.Entry<BoundedWindow, List<WindowedValue<?>>> elements :
+ partitionedElements.entrySet()) {
+
+ Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+ resultMap.put(
+ elements.getKey(),
+ viewFn.apply(
+ InMemoryMultimapSideInputView.fromIterable(
+ keyCoder,
+ (Iterable)
+ elements.getValue().stream()
+ .map(WindowedValue::getValue)
+ .collect(Collectors.toList()))));
+ }
+ T result = resultMap.get(window);
+ if (result == null) {
+ result = viewFn.apply(InMemoryMultimapSideInputView.empty());
+ }
+ return result;
+ }
+
+ private Map<BoundedWindow, List<WindowedValue<?>>> getPartitionedElements(
+ PCollectionView<?> view) {
+ Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = new
HashMap<>();
DataPartition<?> sideInput =
runtimeContext.getInput(view.getTagInternal().getId());
DataPartitionConsumer<?> dataPartitionConsumer = sideInput.getConsumer();
while (dataPartitionConsumer.hasNext()) {
- WindowedValue<KV<?, ?>> winValue = (WindowedValue<KV<?, ?>>)
dataPartitionConsumer.next();
+ WindowedValue<?> winValue = (WindowedValue<?>)
dataPartitionConsumer.next();
for (BoundedWindow tbw : winValue.getWindows()) {
- List<WindowedValue<KV<?, ?>>> windowedValues =
+ List<WindowedValue<?>> windowedValues =
partitionedElements.computeIfAbsent(tbw, k -> new ArrayList<>());
windowedValues.add(winValue);
}
}
+ return partitionedElements;
+ }
+
+ private <T> T getIterableSideInput(PCollectionView<T> view, BoundedWindow
window) {
+ Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements =
getPartitionedElements(view);
+ ViewFn<Materializations.IterableView, T> viewFn =
+ (ViewFn<Materializations.IterableView, T>) view.getViewFn();
Map<BoundedWindow, T> resultMap = new HashMap<>();
- for (Map.Entry<BoundedWindow, List<WindowedValue<KV<?, ?>>>> elements :
+ for (Map.Entry<BoundedWindow, List<WindowedValue<?>>> elements :
partitionedElements.entrySet()) {
-
- ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>)
view.getViewFn();
- Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
resultMap.put(
elements.getKey(),
- (T)
- viewFn.apply(
- InMemoryMultimapSideInputView.fromIterable(
- keyCoder,
- (Iterable)
- elements.getValue().stream()
- .map(WindowedValue::getValue)
- .collect(Collectors.toList()))));
+ viewFn.apply(
+ () ->
+ elements.getValue().stream()
+ .map(WindowedValue::getValue)
+ .collect(Collectors.toList())));
}
T result = resultMap.get(window);
if (result == null) {
- ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>)
view.getViewFn();
- result = viewFn.apply(InMemoryMultimapSideInputView.empty());
+ result = viewFn.apply(() -> Collections.<T>emptyList());
}
return result;
}