[
https://issues.apache.org/jira/browse/BEAM-3900?focusedWorklogId=108519&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108519
]
ASF GitHub Bot logged work on BEAM-3900:
----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Jun/18 11:50
Start Date: 04/Jun/18 11:50
Worklog Time Spent: 10m
Work Description: jbonofre closed pull request #5447: [BEAM-3900] Join
operator translation, Beam windowing, new tests from operator testsuite enabled
.
URL: https://github.com/apache/beam/pull/5447
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/.test-infra/jenkins/job_beam_PreCommit_Java_GradleBuild.groovy
b/.test-infra/jenkins/job_beam_PreCommit_Java_GradleBuild.groovy
index 56af6da64a4..ce67d399ea6 100644
--- a/.test-infra/jenkins/job_beam_PreCommit_Java_GradleBuild.groovy
+++ b/.test-infra/jenkins/job_beam_PreCommit_Java_GradleBuild.groovy
@@ -44,8 +44,6 @@ job('beam_PreCommit_Java_GradleBuild') {
rootBuildScriptDir(common_job_properties.checkoutDir)
tasks(':javaPreCommit')
common_job_properties.setGradleSwitches(delegate)
- // Specify maven home on Jenkins, needed by Maven archetype integration
tests.
- switches('-Pmaven_home=/home/jenkins/tools/maven/apache-maven-3.5.2')
}
}
}
diff --git a/build.gradle b/build.gradle
index b00cfa27f54..840ce611e04 100644
--- a/build.gradle
+++ b/build.gradle
@@ -137,8 +137,6 @@ task javaPreCommit() {
dependsOn ":rat"
dependsOn ":beam-sdks-java-core:buildNeeded"
dependsOn ":beam-sdks-java-core:buildDependents"
- dependsOn
":beam-sdks-java-maven-archetypes-examples:generateAndBuildArchetypeTest"
- dependsOn
":beam-sdks-java-maven-archetypes-starter:generateAndBuildArchetypeTest"
dependsOn ":beam-examples-java:preCommit"
}
diff --git a/sdks/java/extensions/euphoria/euphoria-beam/build.gradle
b/sdks/java/extensions/euphoria/euphoria-beam/build.gradle
index 2d5cb36e453..60d35bdc101 100644
--- a/sdks/java/extensions/euphoria/euphoria-beam/build.gradle
+++ b/sdks/java/extensions/euphoria/euphoria-beam/build.gradle
@@ -29,12 +29,10 @@ dependencies {
compile "com.esotericsoftware.kryo:kryo:${kryoVersion}"
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
shadow 'com.google.code.findbugs:annotations:3.0.1'
- testCompile project(':beam-sdks-java-extensions-euphoria-operator-testkit')
testCompile project(':beam-sdks-java-extensions-euphoria-testing')
testCompile project(':beam-runners-direct-java')
testCompile library.java.slf4j_api
testCompile library.java.hamcrest_core
-
}
test.testLogging.showStandardStreams = true
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/FlowTranslator.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/FlowTranslator.java
index e03a65e424a..581abec9f46 100644
---
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/FlowTranslator.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/FlowTranslator.java
@@ -17,14 +17,17 @@
*/
package org.apache.beam.sdk.extensions.euphoria.beam;
-import java.util.IdentityHashMap;
-import java.util.Map;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.euphoria.beam.io.BeamWriteSink;
import
org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.client.flow.Flow;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.DataSink;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Operator;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceStateByKey;
@@ -43,8 +46,10 @@
*/
class FlowTranslator {
- private static final Map<Class, OperatorTranslator> translators = new
IdentityHashMap<>();
+ private static final Multimap<Class, OperatorTranslator> translators =
ArrayListMultimap.create();
+ //Note that when there are more than one translator ordering defines
priority.
+ //First added to `translators` is first asked whenever it can translate the
operator.
static {
translators.put(FlowUnfolder.InputOperator.class, new InputTranslator());
translators.put(FlatMap.class, new FlatMapTranslator());
@@ -54,6 +59,40 @@
// extended operators
translators.put(ReduceByKey.class, new ReduceByKeyTranslator());
translators.put(ReduceStateByKey.class, new ReduceStateByKeyTranslator());
+ translators.put(Join.class, new JoinTranslator());
+ }
+
+ @SuppressWarnings("unchecked")
+ private static boolean isOperatorDirectlyTranslatable(Operator operator){
+ Collection<OperatorTranslator> availableTranslators =
translators.get(operator.getClass());
+ if (availableTranslators.isEmpty()){
+ return false;
+ }
+
+ for (OperatorTranslator translator : availableTranslators){
+ if (translator.canTranslate(operator)){
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Nullable
+ @SuppressWarnings("unchecked")
+ private static OperatorTranslator getTranslatorIfAvailable(Operator
operator){
+ Collection<OperatorTranslator> availableTranslators =
translators.get(operator.getClass());
+ if (availableTranslators.isEmpty()){
+ return null;
+ }
+
+ for (OperatorTranslator translator : availableTranslators){
+ if (translator.canTranslate(operator)){
+ return translator;
+ }
+ }
+
+ return null;
}
static Pipeline toPipeline(
@@ -75,12 +114,12 @@ static Pipeline toPipeline(
static DAG<Operator<?, ?>> toDAG(Flow flow) {
final DAG<Operator<?, ?>> dag =
- FlowUnfolder.unfold(flow, operator ->
translators.containsKey(operator.getClass()));
+ FlowUnfolder.unfold(flow,
FlowTranslator::isOperatorDirectlyTranslatable);
return dag;
}
static DAG<Operator<?, ?>> unfold(DAG<Operator<?, ?>> dag) {
- return FlowUnfolder.translate(dag, operator ->
translators.containsKey(operator.getClass()));
+ return FlowUnfolder.translate(dag,
FlowTranslator::isOperatorDirectlyTranslatable);
}
@SuppressWarnings("unchecked")
@@ -91,7 +130,7 @@ static void updateContextBy(DAG<Operator<?, ?>> dag,
BeamExecutorContext context
.map(Node::get)
.forEach(
op -> {
- final OperatorTranslator translator =
translators.get(op.getClass());
+ final OperatorTranslator translator =
getTranslatorIfAvailable(op);
if (translator == null) {
throw new UnsupportedOperationException(
"Operator " + op.getClass().getSimpleName() + " not
supported");
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTranslator.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTranslator.java
new file mode 100644
index 00000000000..7eaa336501b
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTranslator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam;
+
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.extensions.euphoria.beam.common.InputToKvDoFn;
+import org.apache.beam.sdk.extensions.euphoria.beam.io.KryoCoder;
+import org.apache.beam.sdk.extensions.euphoria.beam.join.FullJoinFn;
+import org.apache.beam.sdk.extensions.euphoria.beam.join.InnerJoinFn;
+import org.apache.beam.sdk.extensions.euphoria.beam.join.JoinFn;
+import org.apache.beam.sdk.extensions.euphoria.beam.join.LeftOuterJoinFn;
+import org.apache.beam.sdk.extensions.euphoria.beam.join.RightOuterJoinFn;
+import org.apache.beam.sdk.extensions.euphoria.beam.window.WindowingUtils;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
+import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+
+/**
+ * {@link OperatorTranslator Translator } for Euphoria {@link Join} operator.
+ */
+public class JoinTranslator implements OperatorTranslator<Join> {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public PCollection<?> translate(Join operator, BeamExecutorContext context) {
+ return doTranslate(operator, context);
+ }
+
+
+ public <K, LeftT, RightT, OutputT, W extends Window<W>> PCollection<Pair<K,
OutputT>>
+ doTranslate(Join<LeftT, RightT, K, OutputT, W> operator, BeamExecutorContext
context) {
+
+ Coder<K> keyCoder = context.getCoder(operator.getLeftKeyExtractor());
+
+ // get input data-sets transformed to Pcollections<KV<K,LeftT/RightT>>
+ List<PCollection<Object>> inputs = context.getInputs(operator);
+
+ PCollection<KV<K, LeftT>> leftKvInput = getKVInputCollection(inputs.get(0),
+ operator.getLeftKeyExtractor(),
+ keyCoder, new KryoCoder<>(), "::extract-keys-left");
+
+ PCollection<KV<K, RightT>> rightKvInput =
getKVInputCollection(inputs.get(1),
+ operator.getRightKeyExtractor(),
+ keyCoder, new KryoCoder<>(), "::extract-keys-right");
+
+ // and apply the same widowing on input Pcolections since the
documentation states:
+ //'all of the PCollections you want to group must use the same
+ // windowing strategy and window sizing'
+ leftKvInput = WindowingUtils.applyWindowingIfSpecified(
+ operator, leftKvInput, context.getAllowedLateness(operator));
+ rightKvInput = WindowingUtils.applyWindowingIfSpecified(
+ operator, rightKvInput, context.getAllowedLateness(operator));
+
+ // GoGroupByKey collections
+ TupleTag<LeftT> leftTag = new TupleTag<>();
+ TupleTag<RightT> rightTag = new TupleTag<>();
+
+ PCollection<KV<K, CoGbkResult>> coGrouped = KeyedPCollectionTuple
+ .of(leftTag, leftKvInput)
+ .and(rightTag, rightKvInput)
+ .apply("::co-group-by-key", CoGroupByKey.create());
+
+ // Join
+ JoinFn<LeftT, RightT, K, OutputT> joinFn = chooseJoinFn(operator, leftTag,
rightTag);
+
+ return coGrouped.apply(joinFn.getFnName(), ParDo.of(joinFn));
+ }
+
+ private <K, ValueT> PCollection<KV<K, ValueT>> getKVInputCollection(
+ PCollection<Object> inputPCollection,
+ UnaryFunction<ValueT, K> keyExtractor,
+ Coder<K> keyCoder, Coder<ValueT> valueCoder, String transformName) {
+
+ @SuppressWarnings("unchecked")
+ PCollection<ValueT> typedInput = (PCollection<ValueT>) inputPCollection;
+ typedInput.setCoder(valueCoder);
+
+ PCollection<KV<K, ValueT>> kvInput =
+ typedInput.apply(transformName, ParDo.of(new
InputToKvDoFn<>(keyExtractor)));
+ kvInput.setCoder(KvCoder.of(keyCoder, valueCoder));
+
+ return kvInput;
+ }
+
+ private <K, LeftT, RightT, OutputT, W extends Window<W>> JoinFn<LeftT,
RightT, K, OutputT>
+ chooseJoinFn(
+ Join<LeftT, RightT, K, OutputT, W> operator, TupleTag<LeftT> leftTag,
+ TupleTag<RightT> rightTag) {
+
+ JoinFn<LeftT, RightT, K, OutputT> joinFn;
+ BinaryFunctor<LeftT, RightT, OutputT> joiner = operator.getJoiner();
+
+ switch (operator.getType()) {
+ case INNER:
+ joinFn = new InnerJoinFn<>(joiner, leftTag, rightTag);
+ break;
+ case LEFT:
+ joinFn = new LeftOuterJoinFn<>(joiner, leftTag, rightTag);
+ break;
+ case RIGHT:
+ joinFn = new RightOuterJoinFn<>(joiner, leftTag, rightTag);
+ break;
+ case FULL:
+ joinFn = new FullJoinFn<>(joiner, leftTag, rightTag);
+ break;
+
+ default:
+ throw new UnsupportedOperationException(String.format(
+ "Cannot translate Euphoria '%s' operator to Beam transformations."
+ + " Given join type '%s' is not supported.",
+ Join.class.getSimpleName(), operator.getType()));
+ }
+ return joinFn;
+ }
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/OperatorTranslator.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/OperatorTranslator.java
index a184fc973b6..f24f3790a55 100644
---
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/OperatorTranslator.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/OperatorTranslator.java
@@ -25,7 +25,6 @@
*
* @param <OperatorT> the type of the user defined euphoria operator definition
*/
-@FunctionalInterface
interface OperatorTranslator<OperatorT extends Operator> {
/**
@@ -36,4 +35,15 @@
* @return a beam transformation
*/
PCollection<?> translate(OperatorT operator, BeamExecutorContext context);
+
+ /**
+ * Returns true when implementing {@link OperatorTranslator} is able to
translate given instance
+ * of an operator, false otherwise.
+ *
+ * <p>This method allow us to have more {@link OperatorTranslator}
+ * implementations for one {@link Operator} in case when some specialized
translators are needed.
+ */
+ default boolean canTranslate(OperatorT operator) {
+ return true;
+ }
}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/ReduceByKeyTranslator.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/ReduceByKeyTranslator.java
index 4104044038f..382f2189ee5 100644
---
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/ReduceByKeyTranslator.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/ReduceByKeyTranslator.java
@@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.extensions.euphoria.beam;
+
+import static com.google.common.base.Preconditions.checkState;
+
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.euphoria.beam.window.BeamWindowFn;
+import org.apache.beam.sdk.extensions.euphoria.beam.window.WindowingUtils;
import
org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.ReduceFunctor;
@@ -35,7 +38,6 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -48,6 +50,9 @@
private static <InputT, K, V, OutputT, W extends Window<W>>
PCollection<Pair<K, OutputT>>
doTranslate(ReduceByKey<InputT, K, V, OutputT, W> operator,
BeamExecutorContext context) {
+ //TODO Could we even do values sorting ?
+ checkState(operator.getValueComparator() == null, "Values sorting is not
supported.");
+
final UnaryFunction<InputT, K> keyExtractor = operator.getKeyExtractor();
final UnaryFunction<InputT, V> valueExtractor =
operator.getValueExtractor();
final ReduceFunctor<V, OutputT> reducer = operator.getReducer();
@@ -56,24 +61,8 @@
final Coder<K> keyCoder = context.getCoder(keyExtractor);
final Coder<V> valueCoder = context.getCoder(valueExtractor);
- final PCollection<InputT> input;
-
- // ~ apply windowing if specified
- if (operator.getWindowing() == null) {
- input = context.getInput(operator);
- } else {
- input =
- context
- .getInput(operator)
- .apply(
- operator.getName() + "::windowing",
- org.apache.beam.sdk.transforms.windowing.Window.into(
- BeamWindowFn.wrap(operator.getWindowing()))
- // TODO: trigger
- .triggering(AfterWatermark.pastEndOfWindow())
- .discardingFiredPanes()
-
.withAllowedLateness(context.getAllowedLateness(operator)));
- }
+ final PCollection<InputT> input =
WindowingUtils.applyWindowingIfSpecified(operator,
+ context.getInput(operator), context.getAllowedLateness(operator));
// ~ create key & value extractor
final MapElements<InputT, KV<K, V>> extractor =
@@ -119,11 +108,17 @@
}
}
+ @Override
+ public boolean canTranslate(ReduceByKey operator) {
+ // translation of sorted values is not supported yet
+ return operator.getValueComparator() == null;
+ }
+
private static <InputT, OutputT> SerializableFunction<Iterable<InputT>,
InputT> asCombiner(
ReduceFunctor<InputT, OutputT> reducer) {
- @SuppressWarnings("unchecked")
- final ReduceFunctor<InputT, InputT> combiner = (ReduceFunctor<InputT,
InputT>) reducer;
+ @SuppressWarnings("unchecked") final ReduceFunctor<InputT, InputT>
combiner =
+ (ReduceFunctor<InputT, InputT>) reducer;
final SingleValueCollector<InputT> collector = new
SingleValueCollector<>();
return (Iterable<InputT> input) -> {
combiner.apply(StreamSupport.stream(input.spliterator(), false),
collector);
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/ReduceStateByKeyTranslator.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/ReduceStateByKeyTranslator.java
index 7e3ae7718f9..b29ea5351bd 100644
---
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/ReduceStateByKeyTranslator.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/ReduceStateByKeyTranslator.java
@@ -65,6 +65,6 @@
.setCoder(new KryoCoder<>());
}
*/
- throw new UnsupportedOperationException("Not supported yet");
+ throw new UnsupportedOperationException("ReduceStateByKy is not supported
yet.");
}
}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/common/InputToKvDoFn.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/common/InputToKvDoFn.java
new file mode 100644
index 00000000000..30e7bd7d760
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/common/InputToKvDoFn.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam.common;
+
+import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * {@link DoFn} which takes input elements and transforms them to {@link KV}
using given key
+ * extractor.
+ */
+public class InputToKvDoFn<InputT, K> extends DoFn<InputT, KV<K, InputT>> {
+
+ private final UnaryFunction<InputT, K> keyExtractor;
+
+ public InputToKvDoFn(UnaryFunction<InputT, K> keyExtractor) {
+ this.keyExtractor = keyExtractor;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ InputT element = c.element();
+ K key = keyExtractor.apply(element);
+ c.output(KV.of(key, element));
+ }
+
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/BeamOperatorsTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/common/package-info.java
similarity index 81%
rename from
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/BeamOperatorsTest.java
rename to
sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/common/package-info.java
index 33e8b246d35..2de3d603025 100644
---
a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/BeamOperatorsTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/common/package-info.java
@@ -15,11 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
/**
- * TODO: write javadoc.
+ * A set of commonly used classes enabling some code reuse.
*/
-public class BeamOperatorsTest extends BeamOperatorsSuite implements
BeamExecutorProvider {
-
-}
+package org.apache.beam.sdk.extensions.euphoria.beam.common;
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/FullJoinFn.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/FullJoinFn.java
new file mode 100644
index 00000000000..7b739893c7b
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/FullJoinFn.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam.join;
+
+import org.apache.beam.sdk.extensions.euphoria.beam.SingleValueCollector;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
+import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Full join implementation of {@link JoinFn}.
+ */
+public class FullJoinFn<LeftT, RightT, K, OutputT> extends JoinFn<LeftT,
RightT, K, OutputT> {
+
+ public FullJoinFn(BinaryFunctor<LeftT, RightT, OutputT> joiner,
TupleTag<LeftT> leftTag,
+ TupleTag<RightT> rightTag) {
+ super(joiner, leftTag, rightTag);
+ }
+
+ @Override
+ protected void doJoin(
+ ProcessContext c, K key, CoGbkResult value,
+ Iterable<LeftT> leftSideIter,
+ Iterable<RightT> rightSideIter) {
+
+ boolean leftHasValues = leftSideIter.iterator().hasNext();
+ boolean rightHasValues = rightSideIter.iterator().hasNext();
+
+ SingleValueCollector<OutputT> outCollector = new SingleValueCollector<>();
+
+ if (leftHasValues && rightHasValues) {
+ for (RightT rightValue : rightSideIter) {
+ for (LeftT leftValue : leftSideIter) {
+ joiner.apply(leftValue, rightValue, outCollector);
+ c.output(Pair.of(key, outCollector.get()));
+ }
+ }
+ } else if (leftHasValues) {
+ for (LeftT leftValue : leftSideIter) {
+ joiner.apply(leftValue, null, outCollector);
+ c.output(Pair.of(key, outCollector.get()));
+ }
+ } else if (rightHasValues) {
+ for (RightT rightValue : rightSideIter) {
+ joiner.apply(null, rightValue, outCollector);
+ c.output(Pair.of(key, outCollector.get()));
+ }
+ }
+ }
+
+ @Override
+ public String getFnName() {
+ return "::full-join";
+ }
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/InnerJoinFn.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/InnerJoinFn.java
new file mode 100644
index 00000000000..4cc7d351ecb
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/InnerJoinFn.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam.join;
+
+import org.apache.beam.sdk.extensions.euphoria.beam.SingleValueCollector;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
+import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Inner join implementation of {@link JoinFn}.
+ */
+public class InnerJoinFn<LeftT, RightT, K, OutputT> extends JoinFn<LeftT,
RightT, K, OutputT> {
+
+ public InnerJoinFn(
+ BinaryFunctor<LeftT, RightT, OutputT> functor,
+ TupleTag<LeftT> leftTag,
+ TupleTag<RightT> rightTag) {
+ super(functor, leftTag, rightTag);
+ }
+
+ @Override
+ protected void doJoin(
+ ProcessContext c, K key, CoGbkResult value,
+ Iterable<LeftT> leftSideIter,
+ Iterable<RightT> rightSideIter) {
+
+ SingleValueCollector<OutputT> outCollector = new SingleValueCollector<>();
+
+ for (LeftT leftItem : leftSideIter) {
+ for (RightT rightItem : rightSideIter) {
+ joiner.apply(leftItem, rightItem, outCollector);
+ c.output(Pair.of(key, outCollector.get()));
+ }
+ }
+ }
+
+ @Override
+ public String getFnName() {
+ return "::inner-join";
+ }
+
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/JoinFn.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/JoinFn.java
new file mode 100644
index 00000000000..fc2a4b53031
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/JoinFn.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam.join;
+
+import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
+import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Abstract base for joint implementations.
+ *
+ * @param <LeftT> type of left-side elements
+ * @param <RightT> type of right-side elements
+ * @param <K> key type
+ * @param <OutputT> type of output elements
+ */
+public abstract class JoinFn<LeftT, RightT, K, OutputT> extends
+ DoFn<KV<K, CoGbkResult>, Pair<K, OutputT>> {
+
+ protected final BinaryFunctor<LeftT, RightT, OutputT> joiner;
+ protected final TupleTag<LeftT> leftTag;
+ protected final TupleTag<RightT> rightTag;
+
+ protected JoinFn(
+ BinaryFunctor<LeftT, RightT, OutputT> joiner,
+ TupleTag<LeftT> leftTag, TupleTag<RightT> rightTag) {
+ this.joiner = joiner;
+ this.leftTag = leftTag;
+ this.rightTag = rightTag;
+ }
+
+ @ProcessElement
+ public final void processElement(ProcessContext c) {
+
+ KV<K, CoGbkResult> element = c.element();
+ CoGbkResult value = element.getValue();
+ K key = element.getKey();
+
+ Iterable<LeftT> leftSideIter = value.getAll(leftTag);
+ Iterable<RightT> rightSideIter = value.getAll(rightTag);
+
+ doJoin(c, key, value, leftSideIter, rightSideIter);
+ }
+
+ protected abstract void doJoin(
+ ProcessContext c, K key, CoGbkResult value,
+ Iterable<LeftT> leftSideIter,
+ Iterable<RightT> rightSideIter);
+
+ public abstract String getFnName();
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/LeftOuterJoinFn.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/LeftOuterJoinFn.java
new file mode 100644
index 00000000000..cb4bac44123
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/LeftOuterJoinFn.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam.join;
+
+import org.apache.beam.sdk.extensions.euphoria.beam.SingleValueCollector;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
+import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.values.TupleTag;
+
+
+/**
+ * Left outer join implementation of {@link JoinFn}.
+ */
+public class LeftOuterJoinFn<LeftT, RightT, K, OutputT> extends JoinFn<LeftT,
RightT, K, OutputT> {
+
+ public LeftOuterJoinFn(
+ BinaryFunctor<LeftT, RightT, OutputT> joiner,
+ TupleTag<LeftT> leftTag,
+ TupleTag<RightT> rightTag) {
+ super(joiner, leftTag, rightTag);
+ }
+
+ @Override
+ protected void doJoin(
+ ProcessContext c, K key, CoGbkResult value,
+ Iterable<LeftT> leftSideIter,
+ Iterable<RightT> rightSideIter) {
+
+ SingleValueCollector<OutputT> outCollector = new SingleValueCollector<>();
+
+ for (LeftT leftValue : leftSideIter) {
+ if (rightSideIter.iterator().hasNext()) {
+ for (RightT rightValue : rightSideIter) {
+ joiner.apply(leftValue, rightValue, outCollector);
+ c.output(Pair.of(key, outCollector.get()));
+ }
+ } else {
+ joiner.apply(leftValue, null, outCollector);
+ c.output(Pair.of(key, outCollector.get()));
+ }
+ }
+ }
+
+ @Override
+ public String getFnName() {
+ return "::left-outer-join";
+ }
+
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/RightOuterJoinFn.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/RightOuterJoinFn.java
new file mode 100644
index 00000000000..6ff0f5a5618
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/RightOuterJoinFn.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam.join;
+
+import org.apache.beam.sdk.extensions.euphoria.beam.SingleValueCollector;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
+import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Right outer join implementation of {@link JoinFn}.
+ */
+public class RightOuterJoinFn<LeftT, RightT, K, OutputT> extends JoinFn<LeftT,
RightT, K, OutputT> {
+
+ public RightOuterJoinFn(
+ BinaryFunctor<LeftT, RightT, OutputT> joiner,
+ TupleTag<LeftT> leftTag,
+ TupleTag<RightT> rightTag) {
+ super(joiner, leftTag, rightTag);
+ }
+
+ @Override
+ protected void doJoin(ProcessContext c, K key, CoGbkResult value,
Iterable<LeftT> leftSideIter,
+ Iterable<RightT> rightSideIter) {
+
+ SingleValueCollector<OutputT> outCollector = new SingleValueCollector<>();
+
+ for (RightT rightValue : rightSideIter) {
+ if (leftSideIter.iterator().hasNext()) {
+ for (LeftT leftValue : leftSideIter) {
+ joiner.apply(leftValue, rightValue, outCollector);
+ c.output(Pair.of(key, outCollector.get()));
+ }
+ } else {
+ joiner.apply(null, rightValue, outCollector);
+ c.output(Pair.of(key, outCollector.get()));
+ }
+ }
+
+ }
+
+ @Override
+ public String getFnName() {
+ return "::right-outer-join";
+ }
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/package-info.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/package-info.java
new file mode 100644
index 00000000000..8548a4a1265
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/join/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * {@link org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join}
translation centered
+ * classes.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam.join;
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/window/BeamWindowing.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/window/BeamWindowing.java
new file mode 100644
index 00000000000..c441a3fb486
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/window/BeamWindowing.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2016-2018 Seznam.cz, a.s.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam.window;
+
+
+import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.WindowedElement;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Windowing;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
+
+/**
+ * Euphoria's {@link Windowing} which wraps Beam's {@link WindowFn}, {@link
Trigger} and {@link
+ * WindowingStrategy.AccumulationMode} to allow Beam's widowing to be defined
through Euphoria API.
+ */
+public class BeamWindowing<T, BeamWinT extends BoundedWindow> implements
+ Windowing<T, UnsupportedWindow> {
+
+ private final WindowFn<?, BeamWinT> windowFn;
+ private final Trigger trigger;
+ private final WindowingStrategy.AccumulationMode accumulationMode;
+
+ private BeamWindowing(WindowFn<?, BeamWinT> windowFn, Trigger beamTrigger,
+ AccumulationMode accumulationMode) {
+ this.windowFn = windowFn;
+ this.trigger = beamTrigger;
+ this.accumulationMode = accumulationMode;
+ }
+
+ public static <T, BeamWinT extends BoundedWindow> BeamWindowing<T, BeamWinT>
of(
+
+ WindowFn<?, BeamWinT> windowFn, Trigger trigger,
+ WindowingStrategy.AccumulationMode accumulationMode) {
+ return new BeamWindowing<>(windowFn, trigger, accumulationMode);
+ }
+
+
+ @Override
+ public Iterable<UnsupportedWindow> assignWindowsToElement(WindowedElement<?,
T> el) {
+ throw new UnsupportedOperationException(
+ "Beam window serves as envelope, it do not supports element to window
assignment.");
+ }
+
+ @Override
+ public
org.apache.beam.sdk.extensions.euphoria.core.client.triggers.Trigger<UnsupportedWindow>
+ getTrigger() {
+ throw new UnsupportedOperationException(
+ "Beam window serves as envelope, it do not contains Euphoria
trigger.");
+ }
+
+ public WindowFn<?, BeamWinT> getWindowFn() {
+ return windowFn;
+ }
+
+ public AccumulationMode getAccumulationMode() {
+ return accumulationMode;
+ }
+
+ public Trigger getBeamTrigger() {
+ return trigger;
+ }
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/window/UnsupportedWindow.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/window/UnsupportedWindow.java
new file mode 100644
index 00000000000..ad267a53e1e
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/window/UnsupportedWindow.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam.window;
+
+import java.util.Objects;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
+
+
+/**
+ * Window used as type parameter of {@link BeamWindowing}.
+ */
+final class UnsupportedWindow extends Window<UnsupportedWindow> {
+
+ private UnsupportedWindow(){
+ //Do not instantiate
+ }
+
+ @Override
+ public int compareTo(UnsupportedWindow o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long maxTimestamp() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int hashCode() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return Objects.equals(this, obj);
+ }
+
+
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/window/WindowingUtils.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/window/WindowingUtils.java
new file mode 100644
index 00000000000..fe7b1acd1fa
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/org/apache/beam/sdk/extensions/euphoria/beam/window/WindowingUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam.window;
+
+import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Windowing;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.WindowWiseOperator;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * Collection of method helpful when dealing with windowing translations.
+ */
+public class WindowingUtils {
+
+ public static <InputT, OutputT, W extends Window<W>>
+ PCollection<InputT> applyWindowingIfSpecified(
+ WindowWiseOperator<?, ?, OutputT, W> operator,
+ PCollection<InputT> input,
+ Duration allowedLateness) {
+
+ Windowing<?, W> userSpecifiedWindowing = operator.getWindowing();
+
+ if (userSpecifiedWindowing == null) {
+ return input;
+ }
+
+ if (!(userSpecifiedWindowing instanceof BeamWindowing)) {
+ throw new IllegalStateException(String.format(
+ "Use of '%s' is only way supported to specify windowing.",
+ BeamWindowing.class.getSimpleName()));
+ }
+
+ @SuppressWarnings("unchecked")
+ BeamWindowing<InputT, ?> beamWindowing = (BeamWindowing)
userSpecifiedWindowing;
+
+ @SuppressWarnings("unchecked")
+ org.apache.beam.sdk.transforms.windowing.Window<InputT> beamWindow =
+ org.apache.beam.sdk.transforms.windowing.Window
+ .into((WindowFn<InputT, ?>) beamWindowing.getWindowFn())
+ .triggering(beamWindowing.getBeamTrigger());
+
+ switch (beamWindowing.getAccumulationMode()) {
+ case DISCARDING_FIRED_PANES:
+ beamWindow = beamWindow.discardingFiredPanes();
+ break;
+ case ACCUMULATING_FIRED_PANES:
+ beamWindow = beamWindow.accumulatingFiredPanes();
+ break;
+ default:
+ throw new IllegalStateException(
+ "Unsupported accumulation mode '" +
beamWindowing.getAccumulationMode() + "'");
+ }
+
+ beamWindow = beamWindow.withAllowedLateness(allowedLateness);
+
+ return input.apply(operator.getName() + "::windowing", beamWindow);
+ }
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTest.java
new file mode 100644
index 00000000000..ddef3f8aa80
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/JoinTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam;
+
+import static java.util.Arrays.asList;
+
+import java.util.Optional;
+import org.apache.beam.sdk.extensions.euphoria.core.client.flow.Flow;
+import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
+import org.apache.beam.sdk.extensions.euphoria.core.client.io.ListDataSink;
+import org.apache.beam.sdk.extensions.euphoria.core.client.io.ListDataSource;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FullJoin;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.LeftJoin;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin;
+import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
+import org.apache.beam.sdk.extensions.euphoria.testing.DatasetAssert;
+import org.junit.Test;
+
+/**
+ * Simple test suite for Join operator.
+ */
+public class JoinTest {
+
+ @Test
+ public void simpleInnerJoinTest() {
+ final Flow flow = Flow.create();
+
+ ListDataSource<Pair<Integer, String>> left =
+ ListDataSource.bounded(
+ asList(
+ Pair.of(1, "L v1"), Pair.of(1, "L v2"),
+ Pair.of(2, "L v1"), Pair.of(2, "L v2"),
+ Pair.of(3, "L v1")
+ ));
+
+ ListDataSource<Pair<Integer, Integer>> right =
+ ListDataSource.bounded(
+ asList(
+ Pair.of(1, 1), Pair.of(1, 10),
+ Pair.of(2, 20),
+ Pair.of(4, 40)
+ ));
+
+ ListDataSink<Pair<Integer, Pair<String, Integer>>> output =
ListDataSink.get();
+
+ Join.of(flow.createInput(left), flow.createInput(right))
+ .by(Pair::getFirst, Pair::getFirst)
+ .using(
+ (Pair<Integer, String> l, Pair<Integer, Integer> r,
Collector<Pair<String, Integer>> c)
+ -> c.collect(Pair.of(l.getSecond(), r.getSecond())))
+ .output()
+ .persist(output);
+
+ BeamExecutor executor = TestUtils.createExecutor();
+ executor.execute(flow);
+
+ DatasetAssert.unorderedEquals(output.getOutputs(),
+ Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)),
+ Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)),
+
+ Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20))
+ );
+
+ }
+
+ @Test
+ public void simpleLeftJoinTest() {
+ final Flow flow = Flow.create();
+
+ ListDataSource<Pair<Integer, String>> left =
+ ListDataSource.bounded(
+ asList(
+ Pair.of(1, "L v1"), Pair.of(1, "L v2"),
+ Pair.of(2, "L v1"), Pair.of(2, "L v2"),
+ Pair.of(3, "L v1")
+ ));
+
+ ListDataSource<Pair<Integer, Integer>> right =
+ ListDataSource.bounded(
+ asList(
+ Pair.of(1, 1), Pair.of(1, 10),
+ Pair.of(2, 20),
+ Pair.of(4, 40)
+ ));
+
+ ListDataSink<Pair<Integer, Pair<String, Integer>>> output =
ListDataSink.get();
+
+ LeftJoin.of(flow.createInput(left), flow.createInput(right))
+ .by(Pair::getFirst, Pair::getFirst)
+ .using((Pair<Integer, String> l, Optional<Pair<Integer, Integer>> r,
+ Collector<Pair<String, Integer>> c) ->
+ c.collect(Pair.of(l.getSecond(), r.orElse(Pair.of(null,
null)).getSecond())))
+ .output()
+ .persist(output);
+
+ BeamExecutor executor = TestUtils.createExecutor();
+ executor.execute(flow);
+
+ DatasetAssert.unorderedEquals(output.getOutputs(),
+ Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)),
+ Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)),
+
+ Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)),
+
+ Pair.of(3, Pair.of("L v1", null))
+ );
+
+ }
+
+ @Test
+ public void simpleRightJoinTest() {
+ final Flow flow = Flow.create();
+
+ ListDataSource<Pair<Integer, String>> left =
+ ListDataSource.bounded(
+ asList(
+ Pair.of(1, "L v1"), Pair.of(1, "L v2"),
+ Pair.of(2, "L v1"), Pair.of(2, "L v2"),
+ Pair.of(3, "L v1")
+ ));
+
+ ListDataSource<Pair<Integer, Integer>> right =
+ ListDataSource.bounded(
+ asList(
+ Pair.of(1, 1), Pair.of(1, 10),
+ Pair.of(2, 20),
+ Pair.of(4, 40)
+ ));
+
+ ListDataSink<Pair<Integer, Pair<String, Integer>>> output =
ListDataSink.get();
+
+ RightJoin.of(flow.createInput(left), flow.createInput(right))
+ .by(Pair::getFirst, Pair::getFirst)
+ .using(
+ (Optional<Pair<Integer, String>> l, Pair<Integer, Integer> r,
+ Collector<Pair<String, Integer>> c) ->
+ c.collect(Pair.of(l.orElse(Pair.of(null, null)).getSecond(),
r.getSecond())))
+ .output()
+ .persist(output);
+
+ BeamExecutor executor = TestUtils.createExecutor();
+ executor.execute(flow);
+
+ DatasetAssert.unorderedEquals(output.getOutputs(),
+ Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)),
+ Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)),
+
+ Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)),
+
+ Pair.of(4, Pair.of(null, 40))
+ );
+
+ }
+
+ @Test
+ public void simpleFullJoinTest() {
+ final Flow flow = Flow.create();
+
+ ListDataSource<Pair<Integer, String>> left =
+ ListDataSource.bounded(
+ asList(
+ Pair.of(1, "L v1"), Pair.of(1, "L v2"),
+ Pair.of(2, "L v1"), Pair.of(2, "L v2"),
+ Pair.of(3, "L v1")
+ ));
+
+ ListDataSource<Pair<Integer, Integer>> right =
+ ListDataSource.bounded(
+ asList(
+ Pair.of(1, 1), Pair.of(1, 10),
+ Pair.of(2, 20),
+ Pair.of(4, 40)
+ ));
+
+ ListDataSink<Pair<Integer, Pair<String, Integer>>> output =
ListDataSink.get();
+
+ FullJoin.of(flow.createInput(left), flow.createInput(right))
+ .by(Pair::getFirst, Pair::getFirst)
+ .using((Optional<Pair<Integer, String>> l, Optional<Pair<Integer,
Integer>> r,
+ Collector<Pair<String, Integer>> c) -> c.collect(Pair.of(
+ l.orElse(Pair.of(null, null)).getSecond(), r.orElse(Pair.of(null,
null)).getSecond())))
+ .output()
+ .persist(output);
+
+ BeamExecutor executor = TestUtils.createExecutor();
+ executor.execute(flow);
+
+ DatasetAssert.unorderedEquals(output.getOutputs(),
+ Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)),
+ Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)),
+
+ Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)),
+
+ Pair.of(3, Pair.of("L v1", null)),
+ Pair.of(4, Pair.of(null, 40))
+ );
+
+ }
+
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/ReduceByKeyTest.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/ReduceByKeyTest.java
index 2a913ab9746..519e94fe976 100644
---
a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/ReduceByKeyTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/ReduceByKeyTest.java
@@ -19,11 +19,10 @@
import static org.junit.Assert.assertTrue;
-import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+import org.apache.beam.sdk.extensions.euphoria.beam.window.BeamWindowing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
-import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Time;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.TimeInterval;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.WindowedElement;
@@ -48,8 +47,9 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums;
import org.apache.beam.sdk.extensions.euphoria.testing.DatasetAssert;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
import org.junit.Ignore;
import org.junit.Test;
@@ -58,12 +58,6 @@
*/
public class ReduceByKeyTest {
- private BeamExecutor createExecutor() {
- String[] args = {"--runner=DirectRunner"};
- PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
- return new BeamExecutor(options).withAllowedLateness(Duration.ofHours(1));
- }
-
@Test
public void testSimpleRBK() {
final Flow flow = Flow.create();
@@ -76,11 +70,14 @@ public void testSimpleRBK() {
ReduceByKey.of(flow.createInput(input, e -> 1000L * e))
.keyBy(i -> i % 2)
.reduceBy(Sums.ofInts())
- .windowBy(Time.of(Duration.ofHours(1)))
+ .windowBy(BeamWindowing.of(
+ FixedWindows.of(org.joda.time.Duration.standardHours(1)),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output()
.persist(output);
- BeamExecutor executor = createExecutor();
+ BeamExecutor executor = TestUtils.createExecutor();
executor.execute(flow);
DatasetAssert.unorderedEquals(output.getOutputs(), Pair.of(0, 8),
Pair.of(1, 7));
@@ -118,15 +115,19 @@ public void testEventTime() {
ListDataSink<Pair<Integer, Long>> sink = ListDataSink.get();
Dataset<Pair<Integer, Long>> input = flow.createInput(source);
input = AssignEventTime.of(input).using(Pair::getSecond).output();
+
ReduceByKey.of(input)
.keyBy(Pair::getFirst)
.valueBy(e -> 1L)
.combineBy(Sums.ofLongs())
- .windowBy(Time.of(Duration.ofSeconds(1)))
+ .windowBy(BeamWindowing.of(
+ FixedWindows.of(org.joda.time.Duration.standardSeconds(1)),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output()
.persist(sink);
- BeamExecutor executor = createExecutor();
+ BeamExecutor executor = TestUtils.createExecutor();
executor.execute(flow);
DatasetAssert.unorderedEquals(
@@ -172,7 +173,10 @@ public void testElementTimestamp() {
.keyBy(e -> "", TypeHint.ofString())
.valueBy(Pair::getFirst, TypeHint.ofInt())
.combineBy(Sums.ofInts(), TypeHint.ofInt())
- .windowBy(Time.of(Duration.ofSeconds(5)))
+ .windowBy(BeamWindowing.of(
+ FixedWindows.of(org.joda.time.Duration.standardSeconds(5)),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
// ~ now use a custom windowing with a trigger which does
// the assertions subject to this test (use RSBK which has to
@@ -192,7 +196,7 @@ public void testElementTimestamp() {
.output()
.persist(sink);
- createExecutor().execute(flow);
+ TestUtils.createExecutor().execute(flow);
DatasetAssert.unorderedEquals(sink.getOutputs(), 4, 6);
}
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/TestUtils.java
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/TestUtils.java
new file mode 100644
index 00000000000..242dabeeb8c
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/TestUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.beam;
+
+import java.time.Duration;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * Collection of methods reused among tests.
+ */
+public class TestUtils {
+
+ static BeamExecutor createExecutor() {
+ String[] args = {"--runner=DirectRunner"};
+ PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
+ return new BeamExecutor(options).withAllowedLateness(Duration.ofHours(1));
+ }
+}
diff --git
a/sdks/java/extensions/euphoria/euphoria-local/src/test/java/org/apache/beam/sdk/extensions/euphoria/executor/local/testkit/LocalOperatorTest.java
b/sdks/java/extensions/euphoria/euphoria-local/src/test/java/org/apache/beam/sdk/extensions/euphoria/executor/local/testkit/LocalOperatorTest.java
index ce3c7a0dbb2..43335e899bb 100644
---
a/sdks/java/extensions/euphoria/euphoria-local/src/test/java/org/apache/beam/sdk/extensions/euphoria/executor/local/testkit/LocalOperatorTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-local/src/test/java/org/apache/beam/sdk/extensions/euphoria/executor/local/testkit/LocalOperatorTest.java
@@ -17,9 +17,11 @@
*/
package org.apache.beam.sdk.extensions.euphoria.executor.local.testkit;
-import org.apache.beam.sdk.extensions.euphoria.operator.test.AllOperatorsSuite;
+import
org.apache.beam.sdk.extensions.euphoria.operator.test.suite.OperatorsTestSuite;
+import org.junit.Ignore;
/**
* Local operator test suite.
*/
-public class LocalOperatorTest extends AllOperatorsSuite implements
LocalExecutorProvider {}
+@Ignore("Local executor do not supports beam widowing.")
+public class LocalOperatorTest extends OperatorsTestSuite implements
LocalExecutorProvider {}
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle
index 3b0fce2be4e..4b0a2b7027b 100644
--- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle
+++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle
@@ -22,6 +22,12 @@ applyJavaNature()
description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8
DSL"
dependencies {
- compile project(':beam-sdks-java-extensions-euphoria-core')
+ compile project(':beam-sdks-java-extensions-euphoria-beam')
+ testCompile project(':beam-runners-direct-java')
+ testCompile project(':beam-sdks-java-extensions-euphoria-testing')
+ shadow 'com.google.code.findbugs:annotations:3.0.1'
compileOnly library.java.junit
+ testCompile library.java.slf4j_api
}
+
+test.testLogging.showStandardStreams = true
\ No newline at end of file
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/JoinTest.java
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/JoinTest.java
index e05203fe0b6..bd4b5761990 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/JoinTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/JoinTest.java
@@ -19,18 +19,19 @@
import static org.junit.Assert.assertEquals;
-import java.time.Duration;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.extensions.euphoria.beam.io.KryoCoder;
+import org.apache.beam.sdk.extensions.euphoria.beam.window.BeamWindowing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
-import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Session;
-import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Time;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.TimeInterval;
-import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.WindowedElement;
-import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Windowing;
import org.apache.beam.sdk.extensions.euphoria.core.client.flow.Flow;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.ListDataSource;
@@ -40,17 +41,27 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.LeftJoin;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin;
-import
org.apache.beam.sdk.extensions.euphoria.core.client.triggers.NoopTrigger;
-import org.apache.beam.sdk.extensions.euphoria.core.client.triggers.Trigger;
-import org.apache.beam.sdk.extensions.euphoria.core.client.util.Either;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
import
org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators.SnapshotProvider;
import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
+import org.joda.time.Instant;
+import org.junit.Ignore;
import org.junit.Test;
-/** Test operator {@code Join}. */
+/**
+ * Test operator {@code Join}.
+ */
@Processing(Processing.Type.ALL)
public class JoinTest extends AbstractOperatorTest {
@@ -271,7 +282,10 @@ public void windowJoinFullOuter() {
(Optional<Integer> l, Optional<Long> r, Collector<String>
c) -> {
c.collect(l.orElse(null) + "+" + r.orElse(null));
})
- .windowBy(new EvenOddWindowing())
+ .windowBy(BeamWindowing.of(
+ new EvenOddWindowFn(),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
}
@@ -321,7 +335,10 @@ public void windowJoinLeftOuter() {
(Integer l, Optional<Long> r, Collector<String> c) -> {
c.collect(l + "+" + r.orElse(null));
})
- .windowBy(new EvenOddWindowing())
+ .windowBy(BeamWindowing.of(
+ new EvenOddWindowFn(),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
}
@@ -366,7 +383,10 @@ public void windowJoinRightOuter() {
(Optional<Integer> l, Long r, Collector<String> c) -> {
c.collect(l.orElse(null) + "+" + r);
})
- .windowBy(new EvenOddWindowing())
+ .windowBy(BeamWindowing.of(
+ new EvenOddWindowFn(),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
}
@@ -403,7 +423,7 @@ public void windowJoinRightOuter() {
public void joinOnSessionWindowingNoEarlyTriggering() {
execute(
new JoinTestCase<
- Pair<String, Long>, Pair<String, Long>, Triple<TimeInterval,
String, String>>() {
+ Pair<String, Long>, Pair<String, Long>, Pair<String, String>>() {
@Override
protected List<Pair<String, Long>> getLeftInput() {
@@ -416,37 +436,41 @@ public void joinOnSessionWindowingNoEarlyTriggering() {
}
@Override
- protected Dataset<Triple<TimeInterval, String, String>> getOutput(
+ protected Dataset<Pair<String, String>> getOutput(
Dataset<Pair<String, Long>> left, Dataset<Pair<String, Long>>
right) {
+
left = AssignEventTime.of(left).using(Pair::getSecond).output();
right = AssignEventTime.of(right).using(Pair::getSecond).output();
- Dataset<Pair<String, Triple<TimeInterval, String, String>>> joined
=
+
+ Dataset<Pair<String, Pair<String, String>>> joined =
Join.of(left, right)
.by(p -> "", p -> "")
.using(
- (Pair<String, Long> l,
- Pair<String, Long> r,
- Collector<Triple<TimeInterval, String, String>> c)
->
- c.collect(
- Triple.of(
- (TimeInterval) c.getWindow(),
l.getFirst(), r.getFirst())))
- .windowBy(Session.of(Duration.ofMillis(10)))
+ (Pair<String, Long> l, Pair<String, Long> r,
+ Collector<Pair<String, String>> c) ->
+ c.collect(Pair.of(l.getFirst(), r.getFirst())))
+ .windowBy(BeamWindowing.of(
+
Sessions.withGapDuration(org.joda.time.Duration.millis(10)),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
return MapElements.of(joined).using(Pair::getSecond).output();
}
@Override
- public List<Triple<TimeInterval, String, String>>
getUnorderedOutput() {
- TimeInterval expectedWindow = new TimeInterval(1, 14);
+ public List<Pair<String, String>> getUnorderedOutput() {
return Arrays.asList(
- Triple.of(expectedWindow, "fi", "ha"),
- Triple.of(expectedWindow, "fi", "ho"),
- Triple.of(expectedWindow, "fa", "ha"),
- Triple.of(expectedWindow, "fa", "ho"));
+ Pair.of("fi", "ha"),
+ Pair.of("fi", "ho"),
+ Pair.of("fa", "ha"),
+ Pair.of("fa", "ho"));
}
});
}
+ @Ignore(
+ "This test is based on access to various objects through Environment
which is "
+ + "unsupported feature. It may be possible to add this feature in
future.")
@Test
public void testJoinAccumulators() {
execute(
@@ -466,8 +490,10 @@ public void testJoinAccumulators() {
@Override
protected Dataset<Triple<TimeInterval, String, String>> getOutput(
Dataset<Pair<String, Long>> left, Dataset<Pair<String, Long>>
right) {
+
left = AssignEventTime.of(left).using(Pair::getSecond).output();
right = AssignEventTime.of(right).using(Pair::getSecond).output();
+
Dataset<Pair<String, Triple<TimeInterval, String, String>>> joined
=
Join.of(left, right)
.by(p -> "", p -> "")
@@ -480,7 +506,11 @@ public void testJoinAccumulators() {
c.getHistogram("hist-" +
l.getFirst().charAt(1)).add(2345, 8);
c.collect(Triple.of(window, l.getFirst(),
r.getFirst()));
})
- .windowBy(Time.of(Duration.ofMillis(3)))
+// .windowBy(Time.of(Duration.ofMillis(3)))
+ .windowBy(BeamWindowing.of(
+ FixedWindows.of(org.joda.time.Duration.millis(3)),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
return MapElements.of(joined).using(Pair::getSecond).output();
}
@@ -511,11 +541,9 @@ public void validateAccumulators(SnapshotProvider
snapshots) {
/**
* Base for join test cases.
- * @param <LeftT>
- * @param <RightT>
- * @param <OutputT>
*/
public abstract static class JoinTestCase<LeftT, RightT, OutputT> implements
TestCase<OutputT> {
+
@Override
public Dataset<OutputT> getOutput(Flow flow, boolean bounded) {
Dataset<LeftT> left = flow.createInput(ListDataSource.of(bounded,
getLeftInput()));
@@ -530,36 +558,87 @@ public void validateAccumulators(SnapshotProvider
snapshots) {
protected abstract List<RightT> getRightInput();
}
- /** Stable windowing for test purposes. */
- static class EvenOddWindowing implements Windowing<Either<Integer, Long>,
IntWindow> {
+
+ /**
+ * Elements with even numeric values are are assigned to one 'even' window.
All others are
+ * assigned to window named 'win: #', where '#' is value of assigned element.
+ */
+ private static class EvenOddWindowFn extends WindowFn<KV<Integer, Number>,
BoundedWindow> {
+
+ private static final NamedGlobalWindow EVEN_WIN = new
NamedGlobalWindow("even");
@Override
- public Iterable<IntWindow> assignWindowsToElement(
- WindowedElement<?, Either<Integer, Long>> input) {
- int element;
- Either<Integer, Long> unwrapped = input.getElement();
- if (unwrapped.isLeft()) {
- element = unwrapped.left();
+ @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
+ public Collection<BoundedWindow> assignWindows(AssignContext c) throws
Exception {
+ KV<Integer, Number> element = c.element();
+
+ Number value = element.getValue();
+
+ if (value == null) {
+ return Collections.singleton(EVEN_WIN);
+ }
+
+ NamedGlobalWindow win;
+ if (value.longValue() % 2 == 0) {
+ win = EVEN_WIN;
} else {
- element = (int) (long) unwrapped.right();
+ win = new NamedGlobalWindow("win: " + value.longValue());
}
- final int label = element % 2 == 0 ? 0 : element;
- return Collections.singleton(new IntWindow(label));
+
+ return Collections.singleton(win);
}
@Override
- public Trigger<IntWindow> getTrigger() {
- return NoopTrigger.get();
+ public void mergeWindows(MergeContext c) throws Exception {
+ // no merging
}
@Override
- public boolean equals(Object obj) {
- return obj instanceof EvenOddWindowing;
+ public boolean isCompatible(WindowFn<?, ?> other) {
+ return other instanceof EvenOddWindowFn;
+ }
+
+ @Override
+ public Coder<BoundedWindow> windowCoder() {
+ return new KryoCoder<>();
+ }
+
+ @Override
+ @Nullable
+ public WindowMappingFn<BoundedWindow> getDefaultWindowMappingFn() {
+ return null;
+ }
+
+ @Override
+ public boolean isNonMerging() {
+ return true;
+ }
+ }
+
+ private static class NamedGlobalWindow extends BoundedWindow {
+
+ private String name;
+
+ public NamedGlobalWindow(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public Instant maxTimestamp() {
+ return GlobalWindow.INSTANCE.maxTimestamp();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof NamedGlobalWindow) {
+ return name.equals(((NamedGlobalWindow) other).name);
+ }
+ return false;
}
@Override
public int hashCode() {
- return 0;
+ return name.hashCode();
}
}
}
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/ReduceByKeyTest.java
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/ReduceByKeyTest.java
index e23ceb6d257..fc6f1e248e2 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/ReduceByKeyTest.java
+++
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/ReduceByKeyTest.java
@@ -23,7 +23,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -33,14 +32,16 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.extensions.euphoria.beam.io.KryoCoder;
+import org.apache.beam.sdk.extensions.euphoria.beam.window.BeamWindowing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Count;
-import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.GlobalWindowing;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.MergingWindowing;
-import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Session;
-import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Time;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.TimeInterval;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import
org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.WindowedElement;
@@ -56,7 +57,6 @@
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.StateContext;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorage;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.state.ValueStorageDescriptor;
-import
org.apache.beam.sdk.extensions.euphoria.core.client.triggers.CountTrigger;
import
org.apache.beam.sdk.extensions.euphoria.core.client.triggers.NoopTrigger;
import org.apache.beam.sdk.extensions.euphoria.core.client.triggers.Trigger;
import
org.apache.beam.sdk.extensions.euphoria.core.client.triggers.TriggerContext;
@@ -68,20 +68,31 @@
import
org.apache.beam.sdk.extensions.euphoria.operator.test.accumulators.SnapshotProvider;
import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.AbstractOperatorTest;
import org.apache.beam.sdk.extensions.euphoria.operator.test.junit.Processing;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
+import org.joda.time.Instant;
+import org.junit.Ignore;
import org.junit.Test;
-/** Test operator {@code ReduceByKey}. */
+/**
+ * Test operator {@code ReduceByKey}.
+ */
@Processing(Processing.Type.ALL)
public class ReduceByKeyTest extends AbstractOperatorTest {
- /** Validates the output type upon a `.reduceBy` operation on windows of
size one. */
+ /**
+ * Validates the output type upon a `.reduceBy` operation on global window.
+ */
@Test
public void testReductionType0() {
execute(
- new AbstractTestCase<Integer, Pair<Integer, Set<Integer>>>(
- /* don't parallelize this test, because it doesn't work
- * well with count windows */
- 1) {
+ new AbstractTestCase<Integer, Pair<Integer, Set<Integer>>>() {
@Override
protected List<Integer> getInput() {
return Arrays.asList(1, 2, 3, 4, 5, 6, 7, 9);
@@ -93,7 +104,10 @@ public void testReductionType0() {
.keyBy(e -> e % 2)
.valueBy(e -> e)
.reduceBy(s -> s.collect(Collectors.toSet()))
- .windowBy(Count.of(3))
+ .windowBy(BeamWindowing.of(
+ new GlobalWindows(),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
}
@@ -101,20 +115,18 @@ public void testReductionType0() {
public List<Pair<Integer, Set<Integer>>> getUnorderedOutput() {
return Arrays.asList(
Pair.of(0, Sets.newHashSet(2, 4, 6)),
- Pair.of(1, Sets.newHashSet(1, 3, 5)),
- Pair.of(1, Sets.newHashSet(7, 9)));
+ Pair.of(1, Sets.newHashSet(1, 3, 5, 7, 9)));
}
});
}
- /** Validates the output type upon a `.reduceBy` operation on windows of
size one. */
+ /**
+ * Validates the output type upon a `.reduceBy` operation on global window.
+ */
@Test
public void testReductionType0_outputValues() {
execute(
- new AbstractTestCase<Integer, Set<Integer>>(
- /* don't parallelize this test, because it doesn't work
- * well with count windows */
- 1) {
+ new AbstractTestCase<Integer, Set<Integer>>() {
@Override
protected List<Integer> getInput() {
return Arrays.asList(1, 2, 3, 4, 5, 6, 7, 9);
@@ -126,19 +138,25 @@ public void testReductionType0_outputValues() {
.keyBy(e -> e % 2)
.valueBy(e -> e)
.reduceBy(s -> s.collect(Collectors.toSet()))
- .windowBy(Count.of(3))
+ .windowBy(BeamWindowing.of(
+ new GlobalWindows(),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.outputValues();
}
@Override
public List<Set<Integer>> getUnorderedOutput() {
return Arrays.asList(
- Sets.newHashSet(2, 4, 6), Sets.newHashSet(1, 3, 5),
Sets.newHashSet(7, 9));
+ Sets.newHashSet(2, 4, 6), Sets.newHashSet(1, 3, 5, 7, 9));
}
});
}
- /** Validates the output type upon a `.reduceBy` operation on windows of
size one. */
+ /**
+ * Validates the output type upon a `.reduceBy` operation on global window.
+ */
+ @Ignore("Sorting of values is not supported yet.")
@Test
public void testReductionType0WithSortedValues() {
execute(
@@ -174,7 +192,10 @@ public void testReductionType0WithSortedValues() {
}
return cmp;
})
- .windowBy(GlobalWindowing.get())
+ .windowBy(BeamWindowing.of(
+ new GlobalWindows(),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
}
@@ -194,7 +215,9 @@ public void validate(List<List<Pair<Integer,
List<Integer>>>> outputs)
});
}
- /** Validates the output type upon a `.reduceBy` operation on windows of
size one. */
+ /**
+ * Validates the output type upon a `.reduceBy` operation on windows of size
one.
+ */
@Test
public void testReductionType0MultiValues() {
execute(
@@ -213,7 +236,11 @@ public void testReductionType0MultiValues() {
return ReduceByKey.of(input)
.keyBy(e -> e % 2)
.reduceBy(Fold.whileEmittingEach(0, (a, b) -> a + b))
- .windowBy(Count.of(3))
+// .windowBy(Count.of(3))
+ .windowBy(BeamWindowing.of(
+ new GlobalWindows(),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
}
@@ -234,12 +261,13 @@ public void validate(List<Pair<Integer, Integer>> output)
{
assertEquals(Arrays.asList(2, 6, 12), byKey.get(0));
assertNotNull(byKey.get(1));
- assertEquals(Sets.newHashSet(1, 4, 9, 7, 16), new
HashSet<>(byKey.get(1)));
+ assertEquals(Sets.newHashSet(1, 4, 9, 16, 25), new
HashSet<>(byKey.get(1)));
}
@Override
public List<Pair<Integer, Integer>> getUnorderedOutput() {
- return Arrays.asList(Pair.of(0, 12), Pair.of(1, 9), Pair.of(1,
16));
+// return Arrays.asList(Pair.of(0, 12), Pair.of(1, 9), Pair.of(1,
16));
+ return Arrays.asList(Pair.of(0, 12), Pair.of(1, 25));
}
});
}
@@ -256,7 +284,11 @@ public void testEventTime() {
.keyBy(Pair::getFirst)
.valueBy(e -> 1L)
.combineBy(Sums.ofLongs())
- .windowBy(Time.of(Duration.ofSeconds(1)))
+// .windowBy(Time.of(Duration.ofSeconds(1)))
+ .windowBy(BeamWindowing.of(
+ FixedWindows.of(org.joda.time.Duration.standardSeconds(1)),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
}
@@ -318,7 +350,11 @@ public void testReduceWithWindowing() {
.keyBy(e -> e % 3)
.valueBy(e -> 1L)
.combineBy(Sums.ofLongs())
- .windowBy(new TestWindowing())
+// .windowBy(new TestWindowing())
+ .windowBy(BeamWindowing.of(
+ new TestWindowFn(),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
}
@@ -398,6 +434,7 @@ public void testReduceWithoutWindowing() {
});
}
+ @Ignore("Sorting of values is not supported yet.")
@Processing(Processing.Type.BOUNDED)
@Test
public void testReduceSorted() {
@@ -440,6 +477,7 @@ public void testReduceSorted() {
});
}
+ @Ignore("Test adaption to Beam windowing failed so far.")
@Test
public void testMergingAndTriggering() {
execute(
@@ -449,14 +487,14 @@ public void testMergingAndTriggering() {
protected List<Pair<String, Long>> getInput() {
return Arrays.asList(
Pair.of("a", 20L),
- Pair.of("c", 3000L),
+ Pair.of("c", 3_000L),
Pair.of("b", 10L),
Pair.of("b", 100L),
- Pair.of("a", 4000L),
+ Pair.of("a", 4_000L),
Pair.of("c", 300L),
- Pair.of("b", 1000L),
- Pair.of("b", 50000L),
- Pair.of("a", 100000L),
+ Pair.of("b", 1_000L),
+ Pair.of("b", 50_000L),
+ Pair.of("a", 100_000L),
Pair.of("a", 800L),
Pair.of("a", 80L));
}
@@ -467,7 +505,11 @@ public void testMergingAndTriggering() {
.keyBy(Pair::getFirst)
.valueBy(Pair::getSecond)
.combineBy(Sums.ofLongs())
- .windowBy(new CWindowing<>(3))
+// .windowBy(new CWindowing<>(3))
+ .windowBy(BeamWindowing.of(
+ new MergingByBucketSizeWindowFn<>(3),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
}
@@ -476,16 +518,16 @@ public void testMergingAndTriggering() {
public List<Pair<String, Long>> getUnorderedOutput() {
return Arrays.asList(
Pair.of("a", 880L),
- Pair.of("a", 104020L),
- Pair.of("b", 1110L),
- Pair.of("b", 50000L),
- Pair.of("c", 3300L));
+ Pair.of("a", 104_020L),
+ Pair.of("b", 1_110L),
+ Pair.of("b", 50_000L),
+ Pair.of("c", 3_300L));
}
});
}
//
----------------------------------------------------------------------------
-
+ @Ignore("Test depends on yet unsupported functionality (access to window
from Collector). ")
@Test
public void testSessionWindowing() {
execute(
@@ -514,13 +556,16 @@ public void testSessionWindowing() {
.keyBy(e -> e.getFirst().charAt(0) - '0')
.valueBy(Pair::getFirst)
.reduceBy(s -> s.collect(Collectors.toSet()))
- .windowBy(Session.of(Duration.ofMillis(5)))
+ .windowBy(BeamWindowing.of(
+ FixedWindows.of(org.joda.time.Duration.millis(5)),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
return FlatMap.of(reduced)
.using(
(UnaryFunctor<
- Pair<Integer, Set<String>>, Triple<TimeInterval,
Integer, Set<String>>>)
+ Pair<Integer, Set<String>>, Triple<TimeInterval,
Integer, Set<String>>>)
(elem, context) ->
context.collect(
Triple.of(
@@ -545,46 +590,45 @@ public void testSessionWindowing() {
});
}
+
+ @Ignore("Test depends on unsupported ReduceStateByKey operator.")
@Test
public void testElementTimestamp() {
- class AssertingWindowing<T> implements Windowing<T, TimeInterval> {
+
+ class AssertingWindowFn<T> extends WindowFn<T, BoundedWindow> {
+
@Override
- public Iterable<TimeInterval> assignWindowsToElement(WindowedElement<?,
T> el) {
+ public Collection<BoundedWindow> assignWindows(AssignContext c) throws
Exception {
+ long timestamp = c.timestamp().getMillis();
+
// ~ we expect the 'element time' to be the end of the window which
produced the
// element in the preceding upstream (stateful and windowed) operator
assertTrue(
- "Invalid timestamp " + el.getTimestamp(),
- el.getTimestamp() == 15_000L - 1 || el.getTimestamp() == 25_000L -
1);
- return Collections.singleton(new TimeInterval(0, Long.MAX_VALUE));
+ "Invalid timestamp " + timestamp,
+ timestamp == 15_000L - 1 || timestamp == 25_000L - 1);
+
+ return Collections.singleton(GlobalWindow.INSTANCE);
}
- @SuppressWarnings("unchecked")
@Override
- public Trigger<TimeInterval> getTrigger() {
- return new CountTrigger(1) {
- @Override
- public boolean isStateful() {
- return false;
- }
+ public void mergeWindows(MergeContext c) throws Exception {
- @Override
- public TriggerResult onElement(long time, Window window,
TriggerContext ctx) {
- // ~ we expect the 'time' to be the end of the window which
produced the
- // element in the preceding upstream (stateful and windowed)
operator
- assertTrue("Invalid timestamp " + time, time == 15_000L - 1 ||
time == 25_000L - 1);
- return super.onElement(time, window, ctx);
- }
- };
}
@Override
- public boolean equals(Object obj) {
- return obj instanceof AssertingWindowing;
+ public boolean isCompatible(WindowFn<?, ?> other) {
+ return other instanceof GlobalWindows;
+ }
+
+ @Override
+ public Coder<BoundedWindow> windowCoder() {
+ return new KryoCoder<>();
}
@Override
- public int hashCode() {
- return 0;
+ @Nullable
+ public WindowMappingFn<BoundedWindow> getDefaultWindowMappingFn() {
+ return null;
}
}
@@ -612,7 +656,11 @@ public int hashCode() {
.keyBy(e -> "", TypeHint.ofString())
.valueBy(Pair::getFirst, TypeHint.ofInt())
.combineBy(Sums.ofInts(), TypeHint.ofInt())
- .windowBy(Time.of(Duration.ofSeconds(5)))
+// .windowBy(Time.of(Duration.ofSeconds(5)))
+ .windowBy(BeamWindowing.of(
+
FixedWindows.of(org.joda.time.Duration.standardSeconds(5)),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
// ~ now use a custom windowing with a trigger which does
// the assertions subject to this test (use RSBK which has to
@@ -623,7 +671,10 @@ public int hashCode() {
.valueBy(Pair::getSecond)
.stateFactory(SumState::new)
.mergeStatesBy(SumState::combine)
- .windowBy(new AssertingWindowing<>())
+ .windowBy(BeamWindowing.of(
+ new AssertingWindowFn<>(),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
return FlatMap.of(output)
.using(
@@ -651,7 +702,11 @@ public void testReduceByKeyWithWrongHashCodeImpl() {
.keyBy(Pair::getFirst)
.valueBy(e -> 1L)
.combineBy(Sums.ofLongs())
- .windowBy(Time.of(Duration.ofSeconds(1)))
+// .windowBy(Time.of(Duration.ofSeconds(1)))
+ .windowBy(BeamWindowing.of(
+ FixedWindows.of(org.joda.time.Duration.standardSeconds(1)),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
}
@@ -703,7 +758,11 @@ public void testAccumulators() {
}
ctx.collect(a + b);
}))
- .windowBy(GlobalWindowing.get())
+// .windowBy(GlobalWindowing.get())
+ .windowBy(BeamWindowing.of(
+ new GlobalWindows(),
+ AfterWatermark.pastEndOfWindow(),
+ AccumulationMode.DISCARDING_FIRED_PANES))
.output();
}
@@ -745,8 +804,101 @@ public int hashCode() {
}
}
+ private static class TestWindowFn extends WindowFn<Number, CountWindow> {
+
+ @Override
+ public Collection<CountWindow> assignWindows(AssignContext c) throws
Exception {
+ Number element = c.element();
+ return Collections.singleton(new CountWindow(element.longValue() / 4));
+ }
+
+ @Override
+ public void mergeWindows(MergeContext c) throws Exception {
+
+ }
+
+ @Override
+ public boolean isNonMerging() {
+ return true;
+ }
+
+ @Override
+ public boolean isCompatible(WindowFn<?, ?> other) {
+ return false;
+ }
+
+ @Override
+ public Coder<CountWindow> windowCoder() {
+ return new KryoCoder<>();
+ }
+
+ @Override
+ @Nullable
+ public WindowMappingFn<CountWindow> getDefaultWindowMappingFn() {
+ return null;
+ }
+ }
+
// ~
------------------------------------------------------------------------------
+ private static class CountWindow extends BoundedWindow {
+
+ private long value;
+
+ public CountWindow(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public Instant maxTimestamp() {
+ return GlobalWindow.INSTANCE.maxTimestamp();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof CountWindow) {
+ return value == (((CountWindow) other).value);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.hashCode(value);
+ }
+ }
+
+ private static class UniqueWindow extends BoundedWindow {
+
+ private static final AtomicInteger idCounter = new AtomicInteger();
+ private final int id;
+
+ public UniqueWindow() {
+ this.id = idCounter.getAndIncrement();
+ }
+
+ @Override
+ public Instant maxTimestamp() {
+ return GlobalWindow.INSTANCE.maxTimestamp();
+ }
+
+ @Override
+ public int hashCode() {
+ return Integer.hashCode(id);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof UniqueWindow
+ && this.id == ((UniqueWindow) obj).id;
+ }
+
+ @Override
+ public String toString() {
+ return "UniqueWindow{id=" + id + "}";
+ }
+ }
+
/**
* Every instance is unique: this allows us to exercise merging.
*/
@@ -791,6 +943,7 @@ public String toString() {
// count windowing; firing based on window.bucket (size of the window)
static final class CWindowTrigger implements Trigger<CWindow> {
+
private final ValueStorageDescriptor<Long> countDesc =
ValueStorageDescriptor.of("count", Long.class, 0L, (x, y) -> x + y);
@@ -820,7 +973,62 @@ public void onMerge(CWindow w,
TriggerContext.TriggerMergeContext ctx) {
}
}
+ private static class MergingByBucketSizeWindowFn<T> extends WindowFn<T,
UniqueWindow> {
+
+ private final int bucketSize;
+
+ private MergingByBucketSizeWindowFn(int bucketSize) {
+ this.bucketSize = bucketSize;
+ }
+
+ @Override
+ public Collection<UniqueWindow> assignWindows(AssignContext c) throws
Exception {
+ return Collections.singleton(new UniqueWindow());
+ }
+
+ @Override
+ public void mergeWindows(MergeContext c) throws Exception {
+
+// merge windows up to bucket size
+ Collection<UniqueWindow> windows = c.windows();
+ List<UniqueWindow> merges = new ArrayList<>();
+ for (UniqueWindow w : windows) {
+
+ merges.add(w);
+
+ if (merges.size() == bucketSize) { // time to merge
+ c.merge(merges, w);
+ merges.clear();
+ }
+
+ }
+
+ if (merges.size() > 1) {
+ c.merge(merges, merges.get(merges.size() - 1));
+ }
+
+ }
+
+ @Override
+ public boolean isCompatible(WindowFn<?, ?> other) {
+ return other instanceof MergingByBucketSizeWindowFn
+ && this.bucketSize == ((MergingByBucketSizeWindowFn)
other).bucketSize;
+ }
+
+ @Override
+ public Coder<UniqueWindow> windowCoder() {
+ return new KryoCoder<>();
+ }
+
+ @Override
+ @Nullable
+ public WindowMappingFn<UniqueWindow> getDefaultWindowMappingFn() {
+ return null;
+ }
+ }
+
static final class CWindowing<T> implements MergingWindowing<T, CWindow> {
+
private final int size;
CWindowing(int size) {
@@ -869,6 +1077,7 @@ public int hashCode() {
}
static class SumState implements State<Integer, Integer> {
+
private final ValueStorage<Integer> sum;
SumState(StateContext context, Collector<Integer> collector) {
@@ -900,7 +1109,9 @@ public void close() {
}
}
- /** String with invalid hash code implementation returning constant. */
+ /**
+ * String with invalid hash code implementation returning constant.
+ */
public static class Word {
private final String str;
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/BeamOperatorsSuite.java
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/suite/OperatorsTestSuite.java
similarity index 78%
rename from
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/BeamOperatorsSuite.java
rename to
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/suite/OperatorsTestSuite.java
index 2a33cfae35a..6c62cfbf4db 100644
---
a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/BeamOperatorsSuite.java
+++
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/suite/OperatorsTestSuite.java
@@ -15,9 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
+package org.apache.beam.sdk.extensions.euphoria.operator.test.suite;
import org.apache.beam.sdk.extensions.euphoria.operator.test.FlatMapTest;
+import org.apache.beam.sdk.extensions.euphoria.operator.test.JoinTest;
+import org.apache.beam.sdk.extensions.euphoria.operator.test.ReduceByKeyTest;
import org.apache.beam.sdk.extensions.euphoria.operator.test.UnionTest;
import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.ExecutorProvider;
import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.ExecutorProviderRunner;
@@ -25,21 +27,19 @@
import org.junit.runners.Suite;
/**
- * This is a copy of
- * {@link
org.apache.beam.sdk.extensions.euphoria.operator.test.AllOperatorsSuite} to
allow us track
- * progress on incrementally implementing operator and their tests.
- * TODO: When done, this class should go away and original should be used
instead
+ * Euphoria operators test suite.
*/
@RunWith(ExecutorProviderRunner.class)
@Suite.SuiteClasses({
+ // BroadcastHashJoinTest.class,
// CountByKeyTest.class,
// DistinctTest.class,
// FilterTest.class,
FlatMapTest.class,
- // JoinTest.class,
+ JoinTest.class,
// JoinWindowEnforcementTest.class,
// MapElementsTest.class,
- // ReduceByKeyTest.class,
+ ReduceByKeyTest.class,
// ReduceStateByKeyTest.class,
// SumByKeyTest.class,
// TopPerKeyTest.class,
@@ -48,6 +48,6 @@
// WindowingTest.class,
// WatermarkTest.class,
})
-public abstract class BeamOperatorsSuite implements ExecutorProvider {
+public abstract class OperatorsTestSuite implements ExecutorProvider {
}
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/suite/package-info.java
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/suite/package-info.java
new file mode 100644
index 00000000000..09e896b89ea
--- /dev/null
+++
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/suite/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Test suite package.
+ */
+package org.apache.beam.sdk.extensions.euphoria.operator.test.suite;
diff --git
a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/BeamExecutorProvider.java
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/test/java/org/apache/beam/sdk/extensions/euphoria/operator/test/BeamExecutorProvider.java
similarity index 96%
rename from
sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/BeamExecutorProvider.java
rename to
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/test/java/org/apache/beam/sdk/extensions/euphoria/operator/test/BeamExecutorProvider.java
index ddac9f5af93..0181ed50011 100644
---
a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/org/apache/beam/sdk/extensions/euphoria/beam/testkit/BeamExecutorProvider.java
+++
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/test/java/org/apache/beam/sdk/extensions/euphoria/operator/test/BeamExecutorProvider.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.euphoria.beam.testkit;
+package org.apache.beam.sdk.extensions.euphoria.operator.test;
import java.time.Duration;
import org.apache.beam.sdk.extensions.euphoria.beam.BeamExecutor;
@@ -30,6 +30,7 @@
*/
public interface BeamExecutorProvider extends ExecutorProvider {
+ @Override
default ExecutorEnvironment newExecutorEnvironment() throws Exception {
final String[] args = {"--runner=DirectRunner"};
final PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
diff --git
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/AllOperatorsSuite.java
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/test/java/org/apache/beam/sdk/extensions/euphoria/operator/test/OperatorsTest.java
similarity index 51%
rename from
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/AllOperatorsSuite.java
rename to
sdks/java/extensions/euphoria/euphoria-operator-testkit/src/test/java/org/apache/beam/sdk/extensions/euphoria/operator/test/OperatorsTest.java
index 65d6f7c65da..c3748ff53a7 100644
---
a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/org/apache/beam/sdk/extensions/euphoria/operator/test/AllOperatorsSuite.java
+++
b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/test/java/org/apache/beam/sdk/extensions/euphoria/operator/test/OperatorsTest.java
@@ -17,28 +17,11 @@
*/
package org.apache.beam.sdk.extensions.euphoria.operator.test;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.ExecutorProvider;
-import
org.apache.beam.sdk.extensions.euphoria.operator.test.junit.ExecutorProviderRunner;
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
+import
org.apache.beam.sdk.extensions.euphoria.operator.test.suite.OperatorsTestSuite;
-/** Subclass this class to have the whole suite operate on your executor of
choice. */
-@RunWith(ExecutorProviderRunner.class)
[email protected]({
- BroadcastHashJoinTest.class,
- CountByKeyTest.class,
- DistinctTest.class,
- FilterTest.class,
- FlatMapTest.class,
- JoinTest.class,
- JoinWindowEnforcementTest.class,
- MapElementsTest.class,
- ReduceByKeyTest.class,
- ReduceStateByKeyTest.class,
- SumByKeyTest.class,
- TopPerKeyTest.class,
- UnionTest.class,
- WindowingTest.class,
- WatermarkTest.class,
-})
-public abstract class AllOperatorsSuite implements ExecutorProvider {}
+/**
+ * Test which runs whole {@link OperatorsTestSuite}.
+ */
+public class OperatorsTest extends OperatorsTestSuite implements
BeamExecutorProvider {
+
+}
diff --git a/sdks/java/maven-archetypes/examples/build.gradle
b/sdks/java/maven-archetypes/examples/build.gradle
index 79139c4725d..b9844cd207a 100644
--- a/sdks/java/maven-archetypes/examples/build.gradle
+++ b/sdks/java/maven-archetypes/examples/build.gradle
@@ -61,17 +61,6 @@ task generateSources(type: Exec) {
commandLine './generate-sources.sh'
}
-// Add archetype testing/generation to existing GradleBuild PreCommit
-// https://issues.apache.org/jira/browse/BEAM-3256
-task generateAndBuildArchetypeTest(type: Exec) {
- if (project.hasProperty("maven_home")) {
- commandLine "${maven_home}/bin/mvn", 'clean', 'install'
- environment "MAVEN_HOME", "${maven_home}"
- } else {
- commandLine 'mvn', 'clean', 'install'
- }
-}
-
sourceSets {
main {
output.dir('src', builtBy: 'generateSources')
diff --git a/sdks/java/maven-archetypes/starter/build.gradle
b/sdks/java/maven-archetypes/starter/build.gradle
index 01d02861345..7f908494d79 100644
--- a/sdks/java/maven-archetypes/starter/build.gradle
+++ b/sdks/java/maven-archetypes/starter/build.gradle
@@ -23,17 +23,6 @@ description = "Apache Beam :: SDKs :: Java :: Maven
Archetypes :: Starter"
ext.summary = """A Maven archetype to create a simple starter pipeline to
get started using the Apache Beam Java SDK."""
-// Add archetype testing/generation to existing GradleBuild PreCommit
-// https://issues.apache.org/jira/browse/BEAM-3256
-task generateAndBuildArchetypeTest(type: Exec) {
- if (project.hasProperty("maven_home")) {
- commandLine "${maven_home}/bin/mvn", 'clean', 'install'
- environment "MAVEN_HOME", "${maven_home}"
- } else {
- commandLine 'mvn', 'clean', 'install'
- }
-}
-
dependencies {
shadow project(path: ":beam-runners-direct-java", configuration: "shadow")
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 108519)
Time Spent: 20m (was: 10m)
> Introduce Euphoria Java 8 DSL
> -----------------------------
>
> Key: BEAM-3900
> URL: https://issues.apache.org/jira/browse/BEAM-3900
> Project: Beam
> Issue Type: New Feature
> Components: dsl-euphoria
> Reporter: David Moravek
> Assignee: David Moravek
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> This is the umbrella issue for integrating [Euphoria
> API|http://github.com/seznam/euphoria] into Beam.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)