[
https://issues.apache.org/jira/browse/BEAM-4410?focusedWorklogId=109393&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-109393
]
ASF GitHub Bot logged work on BEAM-4410:
----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Jun/18 14:14
Start Date: 06/Jun/18 14:14
Worklog Time Spent: 10m
Work Description: jbonofre closed pull request #5550: [BEAM-4410] added
Broadcast Join translation
URL: https://github.com/apache/beam/pull/5550
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/java/extensions/euphoria/euphoria-beam/build.gradle
b/sdks/java/extensions/euphoria/euphoria-beam/build.gradle
index 60d35bdc101..cbfb18483ce 100644
--- a/sdks/java/extensions/euphoria/euphoria-beam/build.gradle
+++ b/sdks/java/extensions/euphoria/euphoria-beam/build.gradle
@@ -19,6 +19,8 @@
apply from: project(":").file("build_rules.gradle")
applyJavaNature()
+description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8
DSL"
+
ext {
kryoVersion = '2.24.0'
}
@@ -35,4 +37,11 @@ dependencies {
testCompile library.java.hamcrest_core
}
+test {
+ // Tests in testkit are not designed to run as standalone test. They need
a context to run
+ // which is provided by
org.apache.beam.sdk.extensions.euphoria.beam.OperatorsTestSuite.
+ exclude 'org/apache/beam/sdk/extensions/euphoria/beam/testkit/**'
+ include "**/*TestSuite.class"
+}
+
test.testLogging.showStandardStreams = true
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/BrodcastHashJoinTranslator.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/BrodcastHashJoinTranslator.java
new file mode 100644
index 00000000000..7b07dc598fa
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/BrodcastHashJoinTranslator.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam;
+
+import static
org.apache.beam.sdk.extensions.euphoria.beam.common.OperatorTranslatorUtil.getKVInputCollection;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.extensions.euphoria.beam.io.KryoCoder;
+import org.apache.beam.sdk.extensions.euphoria.beam.window.BeamWindowing;
+import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Windowing;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Operator;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.SizeHint;
+import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Translator for {@link
org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin} and
+ * {@link
org.apache.beam.sdk.extensions.euphoria.core.client.operator.LeftJoin} when one
side of
+ * the join fits in memory so it can be distributed in hashmap with the other
side.
+ */
+public class BrodcastHashJoinTranslator implements OperatorTranslator<Join> {
+
+ public static boolean hasFitsInMemoryHint(Operator operator) {
+ return operator != null
+ && operator.getHints() != null
+ && operator.getHints().contains(SizeHint.FITS_IN_MEMORY);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public PCollection<?> translate(Join operator, BeamExecutorContext context) {
+ return doTranslate(operator, context);
+ }
+
+ <K, LeftT, RightT, OutputT, W extends Window<W>> PCollection<Pair<K,
OutputT>> doTranslate(
+ Join<LeftT, RightT, K, OutputT, W> operator, BeamExecutorContext
context) {
+ Coder<K> keyCoder = context.getCoder(operator.getLeftKeyExtractor());
+
+ @SuppressWarnings("unchecked") final PCollection<LeftT> left =
(PCollection<LeftT>) context
+ .getInputs(operator).get(0);
+ @SuppressWarnings("unchecked") final PCollection<RightT> right =
(PCollection<RightT>) context
+ .getInputs(operator).get(1);
+
+ final PCollection<KV<K, LeftT>> leftKvInput =
+ getKVInputCollection(
+ left,
+ operator.getLeftKeyExtractor(),
+ keyCoder,
+ new KryoCoder<>(),
+ ":extract-keys-left");
+
+ final PCollection<KV<K, RightT>> rightKvInput =
+ getKVInputCollection(
+ right,
+ operator.getRightKeyExtractor(),
+ keyCoder,
+ new KryoCoder<>(),
+ ":extract-keys-right");
+
+ switch (operator.getType()) {
+ case LEFT:
+ final PCollectionView<Map<K, Iterable<RightT>>> broadcastRight =
+ rightKvInput.apply(View.asMultimap());
+ return leftKvInput.apply(
+ ParDo.of(new BroadcastHashLeftJoinFn<>(broadcastRight,
operator.getJoiner()))
+ .withSideInputs(broadcastRight));
+
+ case RIGHT:
+ final PCollectionView<Map<K, Iterable<LeftT>>> broadcastLeft =
+ leftKvInput.apply(View.asMultimap());
+ return rightKvInput.apply(
+ ParDo.of(new BroadcastHashRightJoinFn<>(broadcastLeft,
operator.getJoiner()))
+ .withSideInputs(broadcastLeft));
+
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot translate Euphoria '%s' operator to Beam
transformations."
+ + " Given join type '%s' is not supported for
BrodcastHashJoin.",
+ Join.class.getSimpleName(), operator.getType()));
+ }
+ }
+
+ /**
+ * Determines whenever given {@link Join} operator is of right type to be
translated to
+ * broadcasted hash join.
+ */
+ @Override
+ public boolean canTranslate(Join operator) {
+ @SuppressWarnings("unchecked") final ArrayList<Dataset> inputs = new
ArrayList(
+ operator.listInputs());
+ if (inputs.size() != 2) {
+ return false;
+ }
+ final Dataset leftDataset = inputs.get(0);
+ final Dataset rightDataset = inputs.get(1);
+ return (operator.getType() == Join.Type.LEFT &&
hasFitsInMemoryHint(rightDataset.getProducer())
+ || operator.getType() == Join.Type.RIGHT
+ && hasFitsInMemoryHint(leftDataset.getProducer()))
+ && isAllowedWindowing(operator.getWindowing());
+ }
+
+ /**
+ * BroadcastHashJoin supports only GlobalWindow or none.
+ */
+ private boolean isAllowedWindowing(Windowing windowing) {
+ return windowing == null
+ || (windowing instanceof BeamWindowing
+ && ((BeamWindowing) windowing).getWindowFn() instanceof GlobalWindows);
+ }
+
+ static class BroadcastHashRightJoinFn<K, LeftT, RightT, OutputT>
+ extends DoFn<KV<K, RightT>, Pair<K, OutputT>> {
+
+ private final PCollectionView<Map<K, Iterable<LeftT>>> smallSideCollection;
+ private final BinaryFunctor<LeftT, RightT, OutputT> joiner;
+ private final SingleValueCollector<OutputT> outCollector = new
SingleValueCollector<>();
+
+ BroadcastHashRightJoinFn(
+ PCollectionView<Map<K, Iterable<LeftT>>> smallSideCollection,
+ BinaryFunctor<LeftT, RightT, OutputT> joiner) {
+ this.smallSideCollection = smallSideCollection;
+ this.joiner = joiner;
+ }
+
+ @SuppressWarnings("unused")
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ final K key = context.element().getKey();
+ final Map<K, Iterable<LeftT>> map =
context.sideInput(smallSideCollection);
+ final Iterable<LeftT> leftValues = map.getOrDefault(key,
Collections.singletonList(null));
+ leftValues.forEach(
+ leftValue -> {
+ joiner.apply(leftValue, context.element().getValue(),
outCollector);
+ context.output(Pair.of(key, outCollector.get()));
+ });
+ }
+ }
+
+ static class BroadcastHashLeftJoinFn<K, LeftT, RightT, OutputT>
+ extends DoFn<KV<K, LeftT>, Pair<K, OutputT>> {
+
+ private final PCollectionView<Map<K, Iterable<RightT>>>
smallSideCollection;
+ private final BinaryFunctor<LeftT, RightT, OutputT> joiner;
+ private final SingleValueCollector<OutputT> outCollector = new
SingleValueCollector<>();
+
+ BroadcastHashLeftJoinFn(
+ PCollectionView<Map<K, Iterable<RightT>>> smallSideCollection,
+ BinaryFunctor<LeftT, RightT, OutputT> joiner) {
+ this.smallSideCollection = smallSideCollection;
+ this.joiner = joiner;
+ }
+
+ @SuppressWarnings("unused")
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ final K key = context.element().getKey();
+ final Map<K, Iterable<RightT>> map =
context.sideInput(smallSideCollection);
+ final Iterable<RightT> rightValues = map.getOrDefault(key,
Collections.singletonList(null));
+
+ rightValues.forEach(
+ rightValue -> {
+ joiner.apply(context.element().getValue(), rightValue,
outCollector);
+ context.output(Pair.of(key, outCollector.get()));
+ });
+ }
+ }
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/FlowTranslator.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/FlowTranslator.java
index 581abec9f46..36cb7c3af05 100644
---
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/FlowTranslator.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/FlowTranslator.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.euphoria.beam;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Collection;
@@ -59,6 +60,7 @@
// extended operators
translators.put(ReduceByKey.class, new ReduceByKeyTranslator());
translators.put(ReduceStateByKey.class, new ReduceStateByKeyTranslator());
+ translators.put(Join.class, new BrodcastHashJoinTranslator());
translators.put(Join.class, new JoinTranslator());
}
@@ -78,9 +80,10 @@ private static boolean
isOperatorDirectlyTranslatable(Operator operator){
return false;
}
+ @VisibleForTesting
@Nullable
@SuppressWarnings("unchecked")
- private static OperatorTranslator getTranslatorIfAvailable(Operator
operator){
+ static OperatorTranslator getTranslatorIfAvailable(Operator operator) {
Collection<OperatorTranslator> availableTranslators =
translators.get(operator.getClass());
if (availableTranslators.isEmpty()){
return null;
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTranslator.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTranslator.java
index 7eaa336501b..825192ebeed 100644
---
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTranslator.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTranslator.java
@@ -17,10 +17,9 @@
*/
package org.apache.beam.sdk.extensions.euphoria.beam;
-import java.util.List;
+import static
org.apache.beam.sdk.extensions.euphoria.beam.common.OperatorTranslatorUtil.getKVInputCollection;
+
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.euphoria.beam.common.InputToKvDoFn;
import org.apache.beam.sdk.extensions.euphoria.beam.io.KryoCoder;
import org.apache.beam.sdk.extensions.euphoria.beam.join.FullJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.InnerJoinFn;
@@ -30,7 +29,6 @@
import org.apache.beam.sdk.extensions.euphoria.beam.window.WindowingUtils;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
-import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.transforms.ParDo;
@@ -60,13 +58,16 @@
Coder<K> keyCoder = context.getCoder(operator.getLeftKeyExtractor());
// get input data-sets transformed to Pcollections<KV<K,LeftT/RightT>>
- List<PCollection<Object>> inputs = context.getInputs(operator);
+ @SuppressWarnings("unchecked") final PCollection<LeftT> left =
(PCollection<LeftT>) context
+ .getInputs(operator).get(0);
+ @SuppressWarnings("unchecked") final PCollection<RightT> right =
(PCollection<RightT>) context
+ .getInputs(operator).get(1);
- PCollection<KV<K, LeftT>> leftKvInput = getKVInputCollection(inputs.get(0),
+ PCollection<KV<K, LeftT>> leftKvInput = getKVInputCollection(left,
operator.getLeftKeyExtractor(),
keyCoder, new KryoCoder<>(), "::extract-keys-left");
- PCollection<KV<K, RightT>> rightKvInput =
getKVInputCollection(inputs.get(1),
+ PCollection<KV<K, RightT>> rightKvInput = getKVInputCollection(right,
operator.getRightKeyExtractor(),
keyCoder, new KryoCoder<>(), "::extract-keys-right");
@@ -93,22 +94,6 @@
return coGrouped.apply(joinFn.getFnName(), ParDo.of(joinFn));
}
- private <K, ValueT> PCollection<KV<K, ValueT>> getKVInputCollection(
- PCollection<Object> inputPCollection,
- UnaryFunction<ValueT, K> keyExtractor,
- Coder<K> keyCoder, Coder<ValueT> valueCoder, String transformName) {
-
- @SuppressWarnings("unchecked")
- PCollection<ValueT> typedInput = (PCollection<ValueT>) inputPCollection;
- typedInput.setCoder(valueCoder);
-
- PCollection<KV<K, ValueT>> kvInput =
- typedInput.apply(transformName, ParDo.of(new
InputToKvDoFn<>(keyExtractor)));
- kvInput.setCoder(KvCoder.of(keyCoder, valueCoder));
-
- return kvInput;
- }
-
private <K, LeftT, RightT, OutputT, W extends Window<W>> JoinFn<LeftT,
RightT, K, OutputT>
chooseJoinFn(
Join<LeftT, RightT, K, OutputT, W> operator, TupleTag<LeftT> leftTag,
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/common/OperatorTranslatorUtil.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/common/OperatorTranslatorUtil.java
new file mode 100644
index 00000000000..93a79fb788e
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/common/OperatorTranslatorUtil.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam.common;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Shared utility methods among operator translators.
+ */
+public class OperatorTranslatorUtil {
+
+ /**
+ * Transform input to KV elements.
+ */
+ public static <K, ValueT> PCollection<KV<K, ValueT>> getKVInputCollection(
+ PCollection<ValueT> inputPCollection,
+ UnaryFunction<ValueT, K> keyExtractor,
+ Coder<K> keyCoder, Coder<ValueT> valueCoder, String transformName) {
+
+ @SuppressWarnings("unchecked")
+ PCollection<ValueT> typedInput = inputPCollection;
+ typedInput.setCoder(valueCoder);
+
+ PCollection<KV<K, ValueT>> kvInput =
+ typedInput.apply(transformName, ParDo.of(new
InputToKvDoFn<>(keyExtractor)));
+ kvInput.setCoder(KvCoder.of(keyCoder, valueCoder));
+
+ return kvInput;
+ }
+
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTest.java
index ddef3f8aa80..6f7bd33eb61 100644
---
a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTest.java
@@ -18,8 +18,11 @@
package org.apache.beam.sdk.extensions.euphoria.beam;
import static java.util.Arrays.asList;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertThat;
import java.util.Optional;
+import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import org.apache.beam.sdk.extensions.euphoria.core.client.flow.Flow;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.ListDataSink;
@@ -27,7 +30,9 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FullJoin;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.LeftJoin;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.SizeHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.extensions.euphoria.testing.DatasetAssert;
import org.junit.Test;
@@ -37,180 +42,214 @@
*/
public class JoinTest {
- @Test
- public void simpleInnerJoinTest() {
- final Flow flow = Flow.create();
+ static <T> Dataset<T> addFitsInMemoryHint(Dataset<T> smallDataset) {
+ return MapElements.named("smallSide")
+ .of(smallDataset)
+ .using(e -> e)
+ .output(SizeHint.FITS_IN_MEMORY);
+ }
+
+ static void checkBrodcastHashJoinTranslatorUsage(Flow flow) {
+ OperatorTranslator operatorTranslator =
+ flow.operators()
+ .stream()
+ .filter(node -> node instanceof Join)
+ .map(FlowTranslator::getTranslatorIfAvailable)
+ .findFirst()
+ .orElse(null);
+
+ assertThat(operatorTranslator,
instanceOf(BrodcastHashJoinTranslator.class));
+ }
- ListDataSource<Pair<Integer, String>> left =
- ListDataSource.bounded(
- asList(
- Pair.of(1, "L v1"), Pair.of(1, "L v2"),
- Pair.of(2, "L v1"), Pair.of(2, "L v2"),
- Pair.of(3, "L v1")
- ));
-
- ListDataSource<Pair<Integer, Integer>> right =
- ListDataSource.bounded(
- asList(
- Pair.of(1, 1), Pair.of(1, 10),
- Pair.of(2, 20),
- Pair.of(4, 40)
- ));
+ static void checkRightJoin(
+ Flow flow, Dataset<Pair<Integer, String>> left, Dataset<Pair<Integer,
Integer>> right) {
ListDataSink<Pair<Integer, Pair<String, Integer>>> output =
ListDataSink.get();
- Join.of(flow.createInput(left), flow.createInput(right))
+ RightJoin.of(left, right)
.by(Pair::getFirst, Pair::getFirst)
.using(
- (Pair<Integer, String> l, Pair<Integer, Integer> r,
Collector<Pair<String, Integer>> c)
- -> c.collect(Pair.of(l.getSecond(), r.getSecond())))
+ (Optional<Pair<Integer, String>> l,
+ Pair<Integer, Integer> r,
+ Collector<Pair<String, Integer>> c) ->
+ c.collect(Pair.of(l.orElse(Pair.of(null, null)).getSecond(),
r.getSecond())))
.output()
.persist(output);
BeamExecutor executor = TestUtils.createExecutor();
executor.execute(flow);
- DatasetAssert.unorderedEquals(output.getOutputs(),
- Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)),
- Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)),
-
- Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20))
- );
-
+ DatasetAssert.unorderedEquals(
+ output.getOutputs(),
+ Pair.of(1, Pair.of("L v1", 1)),
+ Pair.of(1, Pair.of("L v1", 10)),
+ Pair.of(1, Pair.of("L v2", 1)),
+ Pair.of(1, Pair.of("L v2", 10)),
+ Pair.of(2, Pair.of("L v1", 20)),
+ Pair.of(2, Pair.of("L v2", 20)),
+ Pair.of(4, Pair.of(null, 40)));
}
- @Test
- public void simpleLeftJoinTest() {
- final Flow flow = Flow.create();
-
- ListDataSource<Pair<Integer, String>> left =
- ListDataSource.bounded(
- asList(
- Pair.of(1, "L v1"), Pair.of(1, "L v2"),
- Pair.of(2, "L v1"), Pair.of(2, "L v2"),
- Pair.of(3, "L v1")
- ));
-
- ListDataSource<Pair<Integer, Integer>> right =
- ListDataSource.bounded(
- asList(
- Pair.of(1, 1), Pair.of(1, 10),
- Pair.of(2, 20),
- Pair.of(4, 40)
- ));
+ static void checkLeftJoin(
+ Flow flow, Dataset<Pair<Integer, String>> left, Dataset<Pair<Integer,
Integer>> right) {
ListDataSink<Pair<Integer, Pair<String, Integer>>> output =
ListDataSink.get();
- LeftJoin.of(flow.createInput(left), flow.createInput(right))
+ LeftJoin.of(left, right)
.by(Pair::getFirst, Pair::getFirst)
- .using((Pair<Integer, String> l, Optional<Pair<Integer, Integer>> r,
- Collector<Pair<String, Integer>> c) ->
- c.collect(Pair.of(l.getSecond(), r.orElse(Pair.of(null,
null)).getSecond())))
+ .using(
+ (Pair<Integer, String> l,
+ Optional<Pair<Integer, Integer>> r,
+ Collector<Pair<String, Integer>> c) ->
+ c.collect(Pair.of(l.getSecond(), r.orElse(Pair.of(null,
null)).getSecond())))
.output()
.persist(output);
BeamExecutor executor = TestUtils.createExecutor();
executor.execute(flow);
- DatasetAssert.unorderedEquals(output.getOutputs(),
- Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)),
- Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)),
-
- Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)),
-
- Pair.of(3, Pair.of("L v1", null))
- );
-
+ DatasetAssert.unorderedEquals(
+ output.getOutputs(),
+ Pair.of(1, Pair.of("L v1", 1)),
+ Pair.of(1, Pair.of("L v1", 10)),
+ Pair.of(1, Pair.of("L v2", 1)),
+ Pair.of(1, Pair.of("L v2", 10)),
+ Pair.of(2, Pair.of("L v1", 20)),
+ Pair.of(2, Pair.of("L v2", 20)),
+ Pair.of(3, Pair.of("L v1", null)));
}
@Test
- public void simpleRightJoinTest() {
+ public void simpleInnerJoinTest() {
final Flow flow = Flow.create();
- ListDataSource<Pair<Integer, String>> left =
- ListDataSource.bounded(
- asList(
- Pair.of(1, "L v1"), Pair.of(1, "L v2"),
- Pair.of(2, "L v1"), Pair.of(2, "L v2"),
- Pair.of(3, "L v1")
- ));
-
- ListDataSource<Pair<Integer, Integer>> right =
- ListDataSource.bounded(
- asList(
- Pair.of(1, 1), Pair.of(1, 10),
- Pair.of(2, 20),
- Pair.of(4, 40)
- ));
+ ListDataSource<Pair<Integer, String>> left = getLeftDataSource();
+ ListDataSource<Pair<Integer, Integer>> right = getRightDataSource();
ListDataSink<Pair<Integer, Pair<String, Integer>>> output =
ListDataSink.get();
- RightJoin.of(flow.createInput(left), flow.createInput(right))
+ Join.of(flow.createInput(left), flow.createInput(right))
.by(Pair::getFirst, Pair::getFirst)
.using(
- (Optional<Pair<Integer, String>> l, Pair<Integer, Integer> r,
+ (Pair<Integer, String> l,
+ Pair<Integer, Integer> r,
Collector<Pair<String, Integer>> c) ->
- c.collect(Pair.of(l.orElse(Pair.of(null, null)).getSecond(),
r.getSecond())))
+ c.collect(Pair.of(l.getSecond(), r.getSecond())))
.output()
.persist(output);
BeamExecutor executor = TestUtils.createExecutor();
executor.execute(flow);
- DatasetAssert.unorderedEquals(output.getOutputs(),
- Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)),
- Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)),
+ DatasetAssert.unorderedEquals(
+ output.getOutputs(),
+ Pair.of(1, Pair.of("L v1", 1)),
+ Pair.of(1, Pair.of("L v1", 10)),
+ Pair.of(1, Pair.of("L v2", 1)),
+ Pair.of(1, Pair.of("L v2", 10)),
+ Pair.of(2, Pair.of("L v1", 20)),
+ Pair.of(2, Pair.of("L v2", 20)));
+ }
+
+ @Test
+ public void simpleLeftJoinTest() {
+ final Flow flow = Flow.create();
+
+ ListDataSource<Pair<Integer, String>> left = getLeftDataSource();
+ ListDataSource<Pair<Integer, Integer>> right = getRightDataSource();
- Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)),
+ checkLeftJoin(flow, flow.createInput(left), flow.createInput(right));
+ }
+
+ @Test
+ public void simpleRightJoinTest() {
+ final Flow flow = Flow.create();
- Pair.of(4, Pair.of(null, 40))
- );
+ ListDataSource<Pair<Integer, String>> left = getLeftDataSource();
+ ListDataSource<Pair<Integer, Integer>> right = getRightDataSource();
+ checkRightJoin(flow, flow.createInput(left), flow.createInput(right));
}
@Test
public void simpleFullJoinTest() {
final Flow flow = Flow.create();
- ListDataSource<Pair<Integer, String>> left =
- ListDataSource.bounded(
- asList(
- Pair.of(1, "L v1"), Pair.of(1, "L v2"),
- Pair.of(2, "L v1"), Pair.of(2, "L v2"),
- Pair.of(3, "L v1")
- ));
-
- ListDataSource<Pair<Integer, Integer>> right =
- ListDataSource.bounded(
- asList(
- Pair.of(1, 1), Pair.of(1, 10),
- Pair.of(2, 20),
- Pair.of(4, 40)
- ));
+ ListDataSource<Pair<Integer, String>> left = getLeftDataSource();
+ ListDataSource<Pair<Integer, Integer>> right = getRightDataSource();
ListDataSink<Pair<Integer, Pair<String, Integer>>> output =
ListDataSink.get();
FullJoin.of(flow.createInput(left), flow.createInput(right))
.by(Pair::getFirst, Pair::getFirst)
- .using((Optional<Pair<Integer, String>> l, Optional<Pair<Integer,
Integer>> r,
- Collector<Pair<String, Integer>> c) -> c.collect(Pair.of(
- l.orElse(Pair.of(null, null)).getSecond(), r.orElse(Pair.of(null,
null)).getSecond())))
+ .using(
+ (Optional<Pair<Integer, String>> l,
+ Optional<Pair<Integer, Integer>> r,
+ Collector<Pair<String, Integer>> c) ->
+ c.collect(
+ Pair.of(
+ l.orElse(Pair.of(null, null)).getSecond(),
+ r.orElse(Pair.of(null, null)).getSecond())))
.output()
.persist(output);
BeamExecutor executor = TestUtils.createExecutor();
executor.execute(flow);
- DatasetAssert.unorderedEquals(output.getOutputs(),
- Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)),
- Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)),
+ DatasetAssert.unorderedEquals(
+ output.getOutputs(),
+ Pair.of(1, Pair.of("L v1", 1)),
+ Pair.of(1, Pair.of("L v1", 10)),
+ Pair.of(1, Pair.of("L v2", 1)),
+ Pair.of(1, Pair.of("L v2", 10)),
+ Pair.of(2, Pair.of("L v1", 20)),
+ Pair.of(2, Pair.of("L v2", 20)),
+ Pair.of(3, Pair.of("L v1", null)),
+ Pair.of(4, Pair.of(null, 40)));
+ }
+
+ @Test
+ public void simpleBroadcastHashRightJoinTest() {
+ final Flow flow = Flow.create();
+
+ ListDataSource<Pair<Integer, String>> left = getLeftDataSource();
+ ListDataSource<Pair<Integer, Integer>> right = getRightDataSource();
- Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)),
+ final Dataset<Pair<Integer, String>> smallLeftSide =
+ addFitsInMemoryHint(flow.createInput(left));
- Pair.of(3, Pair.of("L v1", null)),
- Pair.of(4, Pair.of(null, 40))
- );
+ checkRightJoin(flow, smallLeftSide, flow.createInput(right));
+
+ checkBrodcastHashJoinTranslatorUsage(flow);
+ }
+
+ @Test
+ public void simpleBroadcastHashLefttJoinTest() {
+ final Flow flow = Flow.create();
+
+ ListDataSource<Pair<Integer, String>> left = getLeftDataSource();
+ ListDataSource<Pair<Integer, Integer>> right = getRightDataSource();
+
+ final Dataset<Pair<Integer, Integer>> smallRightSide =
+ addFitsInMemoryHint(flow.createInput(right));
+
+ checkLeftJoin(flow, flow.createInput(left), smallRightSide);
+ checkBrodcastHashJoinTranslatorUsage(flow);
}
+ ListDataSource<Pair<Integer, String>> getLeftDataSource() {
+ return ListDataSource.bounded(
+ asList(
+ Pair.of(1, "L v1"),
+ Pair.of(1, "L v2"),
+ Pair.of(2, "L v1"),
+ Pair.of(2, "L v2"),
+ Pair.of(3, "L v1")));
+ }
+
+ ListDataSource<Pair<Integer, Integer>> getRightDataSource() {
+ return ListDataSource.bounded(
+ asList(Pair.of(1, 1), Pair.of(1, 10), Pair.of(2, 20), Pair.of(4, 40)));
+ }
}
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/test/java/org/apache/beam/sdk/extensions/euphoria/operator/test/BeamExecutorProvider.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/OperatorsTestSuite.java
similarity index 52%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/test/java/org/apache/beam/sdk/extensions/euphoria/operator/test/BeamExecutorProvider.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/OperatorsTestSuite.java
index 0181ed50011..571f42ebb9a 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/test/java/org/apache/beam/sdk/extensions/euphoria/operator/test/BeamExecutorProvider.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/OperatorsTestSuite.java
@@ -15,23 +15,48 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam;
import java.time.Duration;
-import org.apache.beam.sdk.extensions.euphoria.beam.BeamExecutor;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.FlatMapTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.JoinTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.ReduceByKeyTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.UnionTest;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.ExecutorEnvironment;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.ExecutorProvider;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.ExecutorProviderRunner;
import org.apache.beam.sdk.extensions.euphoria.core.executor.Executor;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.ExecutorEnvironment;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.ExecutorProvider;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
/**
- * Local {@link ExecutorProvider}.
+ * Euphoria operators test suite.
*/
-public interface BeamExecutorProvider extends ExecutorProvider {
+@RunWith(ExecutorProviderRunner.class)
[email protected]({
+ // BroadcastHashJoinTest.class,
+ // CountByKeyTest.class,
+ // DistinctTest.class,
+ // FilterTest.class,
+ FlatMapTest.class,
+ JoinTest.class,
+ // JoinWindowEnforcementTest.class,
+ // MapElementsTest.class,
+ ReduceByKeyTest.class,
+ // ReduceStateByKeyTest.class,
+ // SumByKeyTest.class,
+ // TopPerKeyTest.class,
+ // SortTest.class,
+ UnionTest.class,
+ // WindowingTest.class,
+ // WatermarkTest.class,
+})
+public class OperatorsTestSuite implements ExecutorProvider {
@Override
- default ExecutorEnvironment newExecutorEnvironment() throws Exception {
+ public ExecutorEnvironment newExecutorEnvironment() throws Exception {
final String[] args = {"--runner=DirectRunner"};
final PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
final Executor executor = new
BeamExecutor(options).withAllowedLateness(Duration.ofHours(1));
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/BroadcastHashJoinTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/BroadcastHashJoinTest.java
similarity index 96%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/BroadcastHashJoinTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/BroadcastHashJoinTest.java
index 4ca7c4aaf17..77936e07138 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/BroadcastHashJoinTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/BroadcastHashJoinTest.java
@@ -15,11 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.LeftJoin;
@@ -27,8 +29,6 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.SizeHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.junit.Test;
/**
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/CountByKeyTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/CountByKeyTest.java
similarity index 93%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/CountByKeyTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/CountByKeyTest.java
index 933f02b0f7c..5a31786e3b3 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/CountByKeyTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/CountByKeyTest.java
@@ -15,19 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing.Type;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Time;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.CountByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing.Type;
import org.junit.Test;
/** Test operator {@code CountByKey}. */
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/DistinctTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/DistinctTest.java
similarity index 93%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/DistinctTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/DistinctTest.java
index 8f64c43eb4e..d1260071c2d 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/DistinctTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/DistinctTest.java
@@ -15,20 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing.Type;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Time;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Distinct;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing.Type;
import org.junit.Test;
/** Test for the {@link Distinct} operator. */
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/FilterTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/FilterTest.java
similarity index 88%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/FilterTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/FilterTest.java
index 974c1df67dc..6a42e49fc5d 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/FilterTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/FilterTest.java
@@ -15,14 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import java.util.Arrays;
import java.util.List;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Filter;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.junit.Test;
/** Test operator {@code Filter}. */
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/FlatMapTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/FlatMapTest.java
similarity index 91%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/FlatMapTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/FlatMapTest.java
index 24f224085d0..f8d04e2c7bf 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/FlatMapTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/FlatMapTest.java
@@ -15,20 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators.SnapshotProvider;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators.SnapshotProvider;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.junit.Test;
/** Test operator {@code FlatMap}. */
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/IntWindow.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/IntWindow.java
similarity index 95%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/IntWindow.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/IntWindow.java
index f4dcee06697..a8c6bea934a 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/IntWindow.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/IntWindow.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/JoinTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/JoinTest.java
similarity index 98%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/JoinTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/JoinTest.java
index bd4b5761990..86aae13672f 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/JoinTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/JoinTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import static org.junit.Assert.assertEquals;
@@ -29,6 +29,9 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.euphoria.beam.io.KryoCoder;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators.SnapshotProvider;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.beam.window.BeamWindowing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.TimeInterval;
@@ -43,9 +46,6 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators.SnapshotProvider;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/JoinWindowEnforcementTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/JoinWindowEnforcementTest.java
similarity index 96%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/JoinWindowEnforcementTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/JoinWindowEnforcementTest.java
index 2e8bc5157b2..16281cf4bfd 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/JoinWindowEnforcementTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/JoinWindowEnforcementTest.java
@@ -15,13 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Count;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.GlobalWindowing;
@@ -32,8 +34,6 @@
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.WindowingRequiredException;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runners.Parameterized;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/MapElementsTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/MapElementsTest.java
similarity index 91%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/MapElementsTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/MapElementsTest.java
index 9fead761031..82956f92934 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/MapElementsTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/MapElementsTest.java
@@ -15,20 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators.SnapshotProvider;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunctionEnv;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators.SnapshotProvider;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.junit.Test;
/** Tests for operator {@code MapElements}. */
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/ReduceByKeyTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/ReduceByKeyTest.java
similarity index 99%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/ReduceByKeyTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/ReduceByKeyTest.java
index fc6f1e248e2..a29cc277105 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/ReduceByKeyTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/ReduceByKeyTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -38,6 +38,9 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.euphoria.beam.io.KryoCoder;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators.SnapshotProvider;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.beam.window.BeamWindowing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Count;
@@ -65,9 +68,6 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators.SnapshotProvider;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/ReduceStateByKeyTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/ReduceStateByKeyTest.java
similarity index 98%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/ReduceStateByKeyTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/ReduceStateByKeyTest.java
index fa01abca544..8fa44e9da95 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/ReduceStateByKeyTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/ReduceStateByKeyTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -29,6 +29,9 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators.SnapshotProvider;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Count;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Session;
@@ -55,9 +58,6 @@
import
org.apache.beam.sdk.extensions.euphoria.core.client.triggers.TriggerContext;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators.SnapshotProvider;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.junit.Test;
/** Test operator {@code ReduceStateByKey}. */
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/ReduceWindowTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/ReduceWindowTest.java
similarity index 93%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/ReduceWindowTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/ReduceWindowTest.java
index 7b9a3d66fc7..7019758af65 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/ReduceWindowTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/ReduceWindowTest.java
@@ -15,18 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Time;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceWindow;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.junit.Test;
/** Test operator {@code ReduceByKey}. */
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/SinkTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/SinkTest.java
similarity index 96%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/SinkTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/SinkTest.java
index f0fe8693f8a..9841f451823 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/SinkTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/SinkTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Time;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
@@ -34,7 +35,6 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.CountByKey;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
import org.junit.Test;
/** Test that a sub-flow applied on sink is correctly preserved. */
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/SumByKeyTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/SumByKeyTest.java
similarity index 90%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/SumByKeyTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/SumByKeyTest.java
index a18aa1af499..487bd22c706 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/SumByKeyTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/SumByKeyTest.java
@@ -15,17 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Time;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.SumByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.junit.Test;
/** Test operator {@code SumByKey}. */
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/TopPerKeyTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/TopPerKeyTest.java
similarity index 93%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/TopPerKeyTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/TopPerKeyTest.java
index 97756244d7d..2679dfa6239 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/TopPerKeyTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/TopPerKeyTest.java
@@ -15,18 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import static java.util.Arrays.asList;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.TopPerKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.junit.Test;
/**
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/UnionTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/UnionTest.java
similarity index 95%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/UnionTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/UnionTest.java
index 18c06d3f830..15499e5eaea 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/UnionTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/UnionTest.java
@@ -15,16 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import java.util.Arrays;
import java.util.List;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import org.apache.beam.sdk.extensions.euphoria.core.client.flow.Flow;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.ListDataSource;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Union;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.junit.Test;
/** Test for operator {@code Union}. */
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/Util.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/Util.java
similarity index 96%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/Util.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/Util.java
index 2292d9f85b8..459d3403ec4 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/Util.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/Util.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import java.util.ArrayList;
import java.util.Collection;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/WatermarkTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/WatermarkTest.java
similarity index 94%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/WatermarkTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/WatermarkTest.java
index b74257b1ef0..d85dfe2ed96 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/WatermarkTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/WatermarkTest.java
@@ -15,11 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Time;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.TimeInterval;
@@ -29,8 +31,6 @@
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.junit.Test;
/**
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/WindowingTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/WindowingTest.java
similarity index 98%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/WindowingTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/WindowingTest.java
index c8fd5b5e5fa..8e270bdaf74 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/WindowingTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/WindowingTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -27,6 +27,8 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.AbstractOperatorTest;
+import org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.MergingWindowing;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Session;
@@ -52,8 +54,6 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
import org.junit.Test;
/** Tests capabilities of {@link Windowing}. */
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/LongCounter.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/LongCounter.java
similarity index 94%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/LongCounter.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/LongCounter.java
index 0f04d931c3a..9e0691f8c10 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/LongCounter.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/LongCounter.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators;
import java.util.concurrent.atomic.AtomicLong;
import
org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.Counter;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/LongHistogram.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/LongHistogram.java
similarity index 95%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/LongHistogram.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/LongHistogram.java
index 9eb809a7041..7ebe69fd8fc 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/LongHistogram.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/LongHistogram.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators;
import java.util.HashMap;
import java.util.Map;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/NanosecondTimer.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/NanosecondTimer.java
similarity index 95%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/NanosecondTimer.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/NanosecondTimer.java
index 13919e05610..f8454e710f6 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/NanosecondTimer.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/NanosecondTimer.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators;
import com.google.common.collect.Maps;
import java.time.Duration;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/SingleJvmAccumulatorProvider.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/SingleJvmAccumulatorProvider.java
similarity index 98%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/SingleJvmAccumulatorProvider.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/SingleJvmAccumulatorProvider.java
index e66f81496d0..53b969044c5 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/SingleJvmAccumulatorProvider.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/SingleJvmAccumulatorProvider.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators;
import java.io.ObjectStreamException;
import java.time.Duration;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/SnapshotProvider.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/SnapshotProvider.java
similarity index 93%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/SnapshotProvider.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/SnapshotProvider.java
index ce10795cd12..8263664b209 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/SnapshotProvider.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/SnapshotProvider.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators;
import java.time.Duration;
import java.util.Map;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/Snapshotable.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/Snapshotable.java
similarity index 91%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/Snapshotable.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/Snapshotable.java
index 60e3db3a4b6..0af544b49c3 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/Snapshotable.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/Snapshotable.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators;
interface Snapshotable<V> {
V getSnapshot();
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/package-info.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/package-info.java
similarity index 91%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/package-info.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/package-info.java
index 03b178aefc1..a631664331a 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/accumulators/package-info.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/accumulators/package-info.java
@@ -19,4 +19,4 @@
/**
* Accumulators to be used in operator test suite.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/AbstractOperatorTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/AbstractOperatorTest.java
similarity index 94%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/AbstractOperatorTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/AbstractOperatorTest.java
index a4dba44914a..d39f4ea9077 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/AbstractOperatorTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/AbstractOperatorTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test.junit;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;
@@ -26,6 +26,9 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators.SingleJvmAccumulatorProvider;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.accumulators.SnapshotProvider;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing.Type;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import org.apache.beam.sdk.extensions.euphoria.core.client.flow.Flow;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.DataSource;
@@ -33,10 +36,6 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.io.ListDataSource;
import org.apache.beam.sdk.extensions.euphoria.core.executor.Executor;
import org.apache.beam.sdk.extensions.euphoria.core.util.Settings;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators
- .SingleJvmAccumulatorProvider;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators.SnapshotProvider;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing.Type;
/**
* Base class for test description of a test case.
@@ -87,8 +86,8 @@
*/
@SuppressWarnings("unchecked")
public void execute(TestCase tc) {
- checkNotNull(executor);
- checkNotNull(processing);
+ checkNotNull(executor, "Cannot run test without executor.");
+ checkNotNull(processing, "Cannot run test when processing type is not
available.");
SingleJvmAccumulatorProvider.Factory accs =
SingleJvmAccumulatorProvider.Factory.get();
executor.setAccumulatorProvider(accs);
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/ExecutorEnvironment.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/ExecutorEnvironment.java
similarity index 93%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/ExecutorEnvironment.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/ExecutorEnvironment.java
index 33fc3bb2471..37a988f8d6b 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/ExecutorEnvironment.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/ExecutorEnvironment.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test.junit;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit;
import org.apache.beam.sdk.extensions.euphoria.core.executor.Executor;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/ExecutorProvider.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/ExecutorProvider.java
similarity index 93%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/ExecutorProvider.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/ExecutorProvider.java
index 510ba81d3ee..b60c62412d3 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/ExecutorProvider.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/ExecutorProvider.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test.junit;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit;
/**
* Provider of {@lin ExecutorEnvironment}.
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/ExecutorProviderRunner.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/ExecutorProviderRunner.java
similarity index 98%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/ExecutorProviderRunner.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/ExecutorProviderRunner.java
index 5f9596d44e2..2eb991cc049 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/ExecutorProviderRunner.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/ExecutorProviderRunner.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test.junit;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit;
import static com.google.common.base.Preconditions.checkArgument;
@@ -27,7 +27,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing.Type;
+import
org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit.Processing.Type;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.Description;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/Processing.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/Processing.java
similarity index 96%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/Processing.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/Processing.java
index e68543439eb..8a2989baf49 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/Processing.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/Processing.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test.junit;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit;
import com.google.common.collect.Lists;
import java.lang.annotation.Inherited;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/package-info.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/package-info.java
similarity index 92%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/package-info.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/package-info.java
index ca6d76ba388..eb2c82cbe6c 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/junit/package-info.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/junit/package-info.java
@@ -19,4 +19,4 @@
/**
* Collection of tst utility classes.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test.junit;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit.junit;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/package-info.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/package-info.java
similarity index 93%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/package-info.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/package-info.java
index 9692a161aa5..11e2cc69836 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/package-info.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/package-info.java
@@ -20,4 +20,4 @@
* Euphoria operators test suite, see
* {@link
org.apache.beam.sdk.extensions.euphoria.operator.test.AllOperatorsSuite}.
*/
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
+package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
diff --git a/sdks/java/extensions/euphoria/euphoria-core/build.gradle
b/sdks/java/extensions/euphoria/euphoria-core/build.gradle
index aec8b36e155..e2b43bd767d 100644
--- a/sdks/java/extensions/euphoria/euphoria-core/build.gradle
+++ b/sdks/java/extensions/euphoria/euphoria-core/build.gradle
@@ -19,7 +19,7 @@
apply from: project(":").file("build_rules.gradle")
applyJavaNature(enableFindbugs: false)
-description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8
DSL"
+description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8
DSL :: Core"
dependencies {
shadow library.java.guava
diff --git a/sdks/java/extensions/euphoria/euphoria-local/build.gradle
b/sdks/java/extensions/euphoria/euphoria-local/build.gradle
index 2d37e8e12ea..d02c4d02771 100644
--- a/sdks/java/extensions/euphoria/euphoria-local/build.gradle
+++ b/sdks/java/extensions/euphoria/euphoria-local/build.gradle
@@ -23,6 +23,6 @@ dependencies {
compile project(':beam-sdks-java-extensions-euphoria-core')
compileOnly library.java.findbugs_jsr305
testCompile project(':beam-sdks-java-extensions-euphoria-testing')
- testCompile project(':beam-sdks-java-extensions-euphoria-operator-testkit')
+ testCompile project(':beam-sdks-java-extensions-euphoria-beam')
// testCompile project(path: ':beam-sdks-java-extensions-euphoria-core',
configuration: 'testArtifact')
}
diff --git
a/sdks/java/extensions/euphoria/euphoria-local/src/test/java/org/apache/beam/sdk/extensions/euphoria/executor/local/testkit/LocalExecutorProvider.java
b/sdks/java/extensions/euphoria/euphoria-local/src/test/java/org/apache/beam/sdk/extensions/euphoria/executor/local/testkit/LocalExecutorProvider.java
deleted file mode 100644
index 4ea6a7fda15..00000000000
---
a/sdks/java/extensions/euphoria/euphoria-local/src/test/java/org/apache/beam/sdk/extensions/euphoria/executor/local/testkit/LocalExecutorProvider.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.euphoria.executor.local.testkit;
-
-import org.apache.beam.sdk.extensions.euphoria.core.executor.Executor;
-import org.apache.beam.sdk.extensions.euphoria.executor.local.LocalExecutor;
-import
org.apache.beam.sdk.extensions.euphoria.executor.local.WatermarkTriggerScheduler;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.ExecutorEnvironment;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.ExecutorProvider;
-
-/**
- * Executor provider used for testing.
- */
-public interface LocalExecutorProvider extends ExecutorProvider {
- @Override
- default ExecutorEnvironment newExecutorEnvironment() throws Exception {
- LocalExecutor exec =
- new LocalExecutor()
- .setTriggeringSchedulerSupplier(() -> new
WatermarkTriggerScheduler(500L));
- return new ExecutorEnvironment() {
- @Override
- public Executor getExecutor() {
- return exec;
- }
-
- @Override
- public void shutdown() throws Exception {
- exec.shutdown();
- }
- };
- }
-}
diff --git
a/sdks/java/extensions/euphoria/euphoria-local/src/test/java/org/apache/beam/sdk/extensions/euphoria/executor/local/testkit/LocalOperatorTest.java
b/sdks/java/extensions/euphoria/euphoria-local/src/test/java/org/apache/beam/sdk/extensions/euphoria/executor/local/testkit/LocalOperatorTest.java
deleted file mode 100644
index 43335e899bb..00000000000
---
a/sdks/java/extensions/euphoria/euphoria-local/src/test/java/org/apache/beam/sdk/extensions/euphoria/executor/local/testkit/LocalOperatorTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.euphoria.executor.local.testkit;
-
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.suite.OperatorsTestSuite;
-import org.junit.Ignore;
-
-/**
- * Local operator test suite.
- */
-@Ignore("Local executor do not supports beam widowing.")
-public class LocalOperatorTest extends OperatorsTestSuite implements
LocalExecutorProvider {}
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle
deleted file mode 100644
index 4b0a2b7027b..00000000000
--- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-apply from: project(":").file("build_rules.gradle")
-applyJavaNature()
-
-description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8
DSL"
-
-dependencies {
- compile project(':beam-sdks-java-extensions-euphoria-beam')
- testCompile project(':beam-runners-direct-java')
- testCompile project(':beam-sdks-java-extensions-euphoria-testing')
- shadow 'com.google.code.findbugs:annotations:3.0.1'
- compileOnly library.java.junit
- testCompile library.java.slf4j_api
-}
-
-test.testLogging.showStandardStreams = true
\ No newline at end of file
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/suite/OperatorsTestSuite.java
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/suite/OperatorsTestSuite.java
deleted file mode 100644
index 6c62cfbf4db..00000000000
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/suite/OperatorsTestSuite.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.euphoria.operator.test.suite;
-
-import org.apache.beam.sdk.extensions.euphoria.operator.test.FlatMapTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.JoinTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.ReduceByKeyTest;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.UnionTest;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.ExecutorProvider;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.ExecutorProviderRunner;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-
-/**
- * Euphoria operators test suite.
- */
-@RunWith(ExecutorProviderRunner.class)
[email protected]({
- // BroadcastHashJoinTest.class,
- // CountByKeyTest.class,
- // DistinctTest.class,
- // FilterTest.class,
- FlatMapTest.class,
- JoinTest.class,
- // JoinWindowEnforcementTest.class,
- // MapElementsTest.class,
- ReduceByKeyTest.class,
- // ReduceStateByKeyTest.class,
- // SumByKeyTest.class,
- // TopPerKeyTest.class,
- // SortTest.class,
- UnionTest.class,
- // WindowingTest.class,
- // WatermarkTest.class,
-})
-public abstract class OperatorsTestSuite implements ExecutorProvider {
-
-}
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/suite/package-info.java
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/suite/package-info.java
deleted file mode 100644
index 09e896b89ea..00000000000
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/suite/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Test suite package.
- */
-package org.apache.beam.sdk.extensions.euphoria.operator.test.suite;
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/test/java/org/apache/beam/sdk/extensions/euphoria/operator/test/OperatorsTest.java
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/test/java/org/apache/beam/sdk/extensions/euphoria/operator/test/OperatorsTest.java
deleted file mode 100644
index c3748ff53a7..00000000000
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/test/java/org/apache/beam/sdk/extensions/euphoria/operator/test/OperatorsTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.euphoria.operator.test;
-
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.suite.OperatorsTestSuite;
-
-/**
- * Test which runs whole {@link OperatorsTestSuite}.
- */
-public class OperatorsTest extends OperatorsTestSuite implements
BeamExecutorProvider {
-
-}
diff --git a/settings.gradle b/settings.gradle
index 8adbab9dbd7..ff2c1b2ece6 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -79,8 +79,6 @@ include "beam-sdks-java-extensions-euphoria-testing"
project(":beam-sdks-java-extensions-euphoria-testing").dir =
file("sdks/java/extensions/euphoria/euphoria-testing")
include "beam-sdks-java-extensions-euphoria-beam"
project(":beam-sdks-java-extensions-euphoria-beam").dir =
file("sdks/java/extensions/euphoria/euphoria-beam")
-include "beam-sdks-java-extensions-euphoria-operator-testkit"
-project(":beam-sdks-java-extensions-euphoria-operator-testkit").dir =
file("sdks/java/extensions/euphoria/euphoria-operator-testkit")
include "beam-sdks-java-extensions-euphoria-local"
project(":beam-sdks-java-extensions-euphoria-local").dir =
file("sdks/java/extensions/euphoria/euphoria-local")
include "beam-sdks-java-extensions-euphoria-fluent"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 109393)
Time Spent: 1h 50m (was: 1h 40m)
> Broadcast Joins
> ---------------
>
> Key: BEAM-4410
> URL: https://issues.apache.org/jira/browse/BEAM-4410
> Project: Beam
> Issue Type: New Feature
> Components: dsl-euphoria
> Reporter: Marek Simunek
> Assignee: Marek Simunek
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> Translation of Broadcast Join. Broadcast join can be very efficient for joins
> between a large dataset with small dataset that has to fit into memory. It's
> implemented with Beam's `sideInput`.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)