[
https://issues.apache.org/jira/browse/BEAM-6054?focusedWorklogId=165487&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165487
]
ASF GitHub Bot logged work on BEAM-6054:
----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Nov/18 16:06
Start Date: 13/Nov/18 16:06
Worklog Time Spent: 10m
Work Description: dmvk closed pull request #7019: [BEAM-6054] Euphoria
translation providers refactored.
URL: https://github.com/apache/beam/pull/7019
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/base/Operator.java
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/base/Operator.java
index 601a0f72898..4f30359e225 100644
---
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/base/Operator.java
+++
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/base/Operator.java
@@ -44,7 +44,7 @@ protected Operator(@Nullable String name, @Nullable
TypeDescriptor<OutputT> outp
*
* @return maybe name
*/
- public final Optional<String> getName() {
+ public Optional<String> getName() {
return Optional.ofNullable(name);
}
@@ -52,4 +52,9 @@ protected Operator(@Nullable String name, @Nullable
TypeDescriptor<OutputT> outp
public Optional<TypeDescriptor<OutputT>> getOutputType() {
return Optional.ofNullable(outputType);
}
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName() + " operator{" + "name='" + name +
'\'' + '}';
+ }
}
diff --git
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaOptions.java
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaOptions.java
index f0621250183..067533b66b5 100644
---
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaOptions.java
+++
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaOptions.java
@@ -19,10 +19,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import
org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider;
-import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
-import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
-import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
-import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Union;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.provider.GenericTranslatorProvider;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
@@ -36,12 +33,7 @@
@Override
public TranslatorProvider create(PipelineOptions options) {
- return SimpleTranslatorProvider.newBuilder()
- .registerTranslator(FlatMap.class, new FlatMapTranslator<>())
- .registerTranslator(Union.class, new UnionTranslator<>())
- .registerTranslator(ReduceByKey.class, new ReduceByKeyTranslator<>())
- .registerTranslator(Join.class, new JoinTranslator<>())
- .build();
+ return GenericTranslatorProvider.createWithDefaultTranslators();
}
}
diff --git
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTranslator.java
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTranslator.java
index 404a3d20056..9d692511582 100644
---
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTranslator.java
+++
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/OperatorTranslator.java
@@ -24,9 +24,9 @@
/**
* A functor to translate an operator into a beam execution.
*
- * @param <InputT> the type of input
- * @param <OutputT> the type of output
- * @param <OperatorT> the type of the user defined euphoria operator definition
+ * @param <InputT> the type of input elements
+ * @param <OutputT> the type of output elements
+ * @param <OperatorT> the type of the euphoria operator
*/
public interface OperatorTranslator<InputT, OutputT, OperatorT extends
Operator> {
diff --git
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/SimpleTranslatorProvider.java
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/SimpleTranslatorProvider.java
deleted file mode 100644
index 179887282cb..00000000000
---
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/SimpleTranslatorProvider.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.euphoria.core.translate;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
-
-/**
- * Default translation provider that selects first matching translation for
the registered operator.
- */
-public class SimpleTranslatorProvider implements TranslatorProvider {
-
- /**
- * Create a new builder for provider.
- *
- * @return builder
- */
- public static Builder newBuilder() {
- return new Builder();
- }
-
- /** {@link SimpleTranslatorProvider} builder. */
- public static class Builder {
-
- private final Multimap<Class<? extends Operator>, OperatorTranslator<?, ?,
?>> translators =
- ArrayListMultimap.create();
-
- private Builder() {}
-
- public Builder registerTranslator(
- Class<? extends Operator> clazz, OperatorTranslator<?, ?, ?>
operatorTranslator) {
- translators.put(clazz, operatorTranslator);
- return this;
- }
-
- public SimpleTranslatorProvider build() {
- return new SimpleTranslatorProvider(translators);
- }
- }
-
- private final Multimap<Class<? extends Operator>, OperatorTranslator<?, ?,
?>> translators;
-
- private SimpleTranslatorProvider(
- Multimap<Class<? extends Operator>, OperatorTranslator<?, ?, ?>>
translators) {
- this.translators = translators;
- }
-
- @Override
- public <InputT, OutputT, OperatorT extends Operator<OutputT>>
- Optional<OperatorTranslator<InputT, OutputT, OperatorT>>
findTranslator(OperatorT operator) {
- @SuppressWarnings("unchecked")
- final List<OperatorTranslator<InputT, OutputT, OperatorT>> candidates =
- new ArrayList<>((Collection) translators.get(operator.getClass()));
- if (!candidates.isEmpty()) {
- for (OperatorTranslator<InputT, OutputT, OperatorT> candidate :
candidates) {
- if (candidate.canTranslate(operator)) {
- return Optional.of(candidate);
- }
- }
- }
- // try to fallback to composite translator
- final OperatorTranslator<InputT, OutputT, OperatorT> fallbackTranslator =
- new CompositeOperatorTranslator<>();
- if (fallbackTranslator.canTranslate(operator)) {
- return Optional.of(fallbackTranslator);
- }
- return Optional.empty();
- }
-}
diff --git
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/UnionTranslator.java
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/UnionTranslator.java
index 0c967c289e4..f111916d507 100644
---
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/UnionTranslator.java
+++
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/UnionTranslator.java
@@ -24,7 +24,7 @@
import org.apache.beam.sdk.values.TypeDescriptor;
/** Euphoria to Beam translation of {@link Union} operator. */
-class UnionTranslator<InputT> implements OperatorTranslator<InputT, InputT,
Union<InputT>> {
+public class UnionTranslator<InputT> implements OperatorTranslator<InputT,
InputT, Union<InputT>> {
@Override
public PCollection<InputT> translate(Union<InputT> operator,
PCollectionList<InputT> inputs) {
diff --git
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/CompositeProvider.java
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/CompositeProvider.java
new file mode 100644
index 00000000000..11887911c17
--- /dev/null
+++
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/CompositeProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.core.translate.provider;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import
org.apache.beam.sdk.extensions.euphoria.core.annotation.stability.Experimental;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.TranslatorProvider;
+
+/**
+ * An implementation of {@link TranslatorProvider} which allows to stack other
{@link
+ * TranslatorProvider TranslatorProviders} in order given on construction time.
+ */
+@Experimental
+public class CompositeProvider implements TranslatorProvider {
+
+ private final List<TranslatorProvider> orderedTranslatorsChain;
+
+ public static CompositeProvider of(List<TranslatorProvider>
orderedTranslatorsChain) {
+ return new CompositeProvider(orderedTranslatorsChain);
+ }
+
+ public static CompositeProvider of(TranslatorProvider...
orderedTranslatorsChain) {
+ requireNonNull(orderedTranslatorsChain);
+ return new CompositeProvider(Arrays.asList(orderedTranslatorsChain));
+ }
+
+ private CompositeProvider(List<TranslatorProvider> orderedTranslatorsChain) {
+ requireNonNull(orderedTranslatorsChain);
+ this.orderedTranslatorsChain =
Collections.unmodifiableList(orderedTranslatorsChain);
+ }
+
+ /**
+ * Returns first {@code Optional<OperatorTranslator<InputT, OutputT,
OperatorT>>} which {@link
+ * OperatorTranslator#canTranslate(Operator) can translate} given operator.
+ *
+ * <p>Translators are acquired by calling {@link
TranslatorProvider#findTranslator(Operator)} on
+ * {@link TranslatorProvider TranslatorProviders} from list given at
construction time in given
+ * order.
+ *
+ * @param operator operator to translate
+ * @param <InputT> the type of input elements
+ * @param <OutputT> the type of output elements
+ * @param <OperatorT> the type of the euphoria operator
+ * @return first {@code Optional<OperatorTranslator<InputT, OutputT,
OperatorT>>} which {@link
+ * OperatorTranslator#canTranslate(Operator) can translate} given
operator.
+ */
+ @Override
+ public <InputT, OutputT, OperatorT extends Operator<OutputT>>
+ Optional<OperatorTranslator<InputT, OutputT, OperatorT>>
findTranslator(OperatorT operator) {
+
+ for (TranslatorProvider provider : orderedTranslatorsChain) {
+ Optional<OperatorTranslator<InputT, OutputT, OperatorT>> maybeTranslator
=
+ provider.findTranslator(operator);
+
+ if (maybeTranslator.isPresent() &&
maybeTranslator.get().canTranslate(operator)) {
+ return maybeTranslator;
+ }
+ }
+
+ return Optional.empty();
+ }
+}
diff --git
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProvider.java
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProvider.java
new file mode 100644
index 00000000000..6f44e325784
--- /dev/null
+++
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProvider.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.core.translate.provider;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import
org.apache.beam.sdk.extensions.euphoria.core.annotation.stability.Experimental;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.CompositeOperator;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Union;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.CompositeOperatorTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.FlatMapTranslator;
+import org.apache.beam.sdk.extensions.euphoria.core.translate.JoinTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.ReduceByKeyTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.TranslatorProvider;
+import org.apache.beam.sdk.extensions.euphoria.core.translate.UnionTranslator;
+
+/**
+ * Adjustable {@link TranslatorProvider} that selects first suitable
translation for the registered
+ * operator.
+ *
+ * <p>{@link OperatorTranslator Translators} can be added by calling variants
of {@link
+ * GenericTranslatorProvider.Builder#register(Class, OperatorTranslator)
register} method during
+ * build. Order of registration is important. Building is started by {@link
#newBuilder()}.
+ */
+@Experimental
+public class GenericTranslatorProvider implements TranslatorProvider {
+
+ public static GenericTranslatorProvider createWithDefaultTranslators() {
+ return GenericTranslatorProvider.newBuilder()
+ .register(FlatMap.class, new FlatMapTranslator<>())
+ .register(Union.class, new UnionTranslator<>())
+ .register(ReduceByKey.class, new ReduceByKeyTranslator<>())
+ .register(Join.class, new JoinTranslator<>())
+ // register fallback operator translator to decompose composit
operators
+ .register(op -> op instanceof CompositeOperator, new
CompositeOperatorTranslator<>())
+ .build();
+ }
+
+ /**
+ * Create a new builder for provider.
+ *
+ * @return builder
+ */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /** {@link GenericTranslatorProvider} builder. */
+ public static class Builder {
+
+ private final List<TranslationDescriptor> possibleTranslators = new
ArrayList<>();
+
+ private Builder() {}
+
+ /**
+ * Registers given {@link OperatorTranslator} to be used for given
operator type.
+ *
+ * @param clazz class of an {@link Operator} to be translated
+ * @param operatorTranslator translator to register
+ * @param <OperatorT> type of an {@link Operator} to be translated
+ * @return this builder
+ */
+ public <OperatorT extends Operator<?>> Builder register(
+ Class<OperatorT> clazz, OperatorTranslator<?, ?, ? extends OperatorT>
operatorTranslator) {
+ possibleTranslators.add(TranslationDescriptor.of(clazz,
operatorTranslator));
+ return this;
+ }
+
+ /**
+ * Register given {@link OperatorTranslator} to be used for operator type
when given {@link
+ * Predicate} holds.
+ *
+ * @param clazz class of an {@link Operator} to be translated
+ * @param predicate user defined predicate which is tested to be true in
order to apply
+ * translator to an {@link Operator}
+ * @param operatorTranslator translator to register
+ * @param <OperatorT> type of an {@link Operator} to be translated
+ * @return this builder
+ */
+ public <OperatorT extends Operator<?>> Builder register(
+ Class<OperatorT> clazz,
+ Predicate<OperatorT> predicate,
+ OperatorTranslator<?, ?, ? extends OperatorT> operatorTranslator) {
+ possibleTranslators.add(TranslationDescriptor.of(clazz, predicate,
operatorTranslator));
+ return this;
+ }
+
+ /**
+ * Registers given {@link OperatorTranslator} to be used for any operator
when given {@link
+ * Predicate} holds.
+ *
+ * @param predicate user defined predicate which is tested to be true in
order to apply
+ * translator to an {@link Operator}. Note that predicate have to be
able to test any {@link
+ * Operator} instance including subtypes.
+ * @param operatorTranslator translator to register
+ * @return this builder
+ */
+ public Builder register(
+ Predicate<Operator> predicate, OperatorTranslator<?, ?, Operator>
operatorTranslator) {
+ possibleTranslators.add(TranslationDescriptor.of(predicate,
operatorTranslator));
+ return this;
+ }
+
+ public GenericTranslatorProvider build() {
+ return new GenericTranslatorProvider(possibleTranslators);
+ }
+ }
+
+ /**
+ * Container for optional {@link Predicate user defined predicate}, optional
{@link Class} of na
+ * operator to be translated and {@link OperatorTranslator} itself. The
predicate and operator
+ * class defines an optional checks. At least one of them have to be present.
+ *
+ * <p>The {@link OperatorTranslator} is allowed to translate an operator iff
it pass all the
+ * checks and {@link OperatorTranslator#canTranslate(Operator) can
translate} given operator. That
+ * allows users to write translators specific for any operator.
+ *
+ * @param <OperatorT> the type of the euphoria operator
+ */
+ private static class TranslationDescriptor<OperatorT extends Operator<?>> {
+
+ /** Class of an {@link Operator} given {@link #translator} can be applied
on. */
+ private final Optional<Class<OperatorT>> operatorClass;
+
+ /**
+ * User specified predicate, which determines whenever given {@link
#translator} can be used to
+ * translate an operator.
+ */
+ private final Optional<Predicate<OperatorT>> userDefinedPredicate;
+
+ private final OperatorTranslator<?, ?, OperatorT> translator;
+
+ private TranslationDescriptor(
+ Optional<Class<OperatorT>> operatorClass,
+ Optional<Predicate<OperatorT>> userDefinedPredicate,
+ OperatorTranslator<?, ?, ? extends OperatorT> translator) {
+ Preconditions.checkState(
+ operatorClass.isPresent() || userDefinedPredicate.isPresent(),
+ "At least user defined predicate or class of an operator have to be
given.");
+
+ @SuppressWarnings("unchecked")
+ OperatorTranslator<?, ?, OperatorT> castedTranslator =
+ (OperatorTranslator<?, ?, OperatorT>) translator;
+
+ this.operatorClass = operatorClass;
+ this.userDefinedPredicate = userDefinedPredicate;
+ this.translator = castedTranslator;
+ }
+
+ static <OperatorT extends Operator<?>> TranslationDescriptor<OperatorT> of(
+ Class<OperatorT> operatorClass,
+ Predicate<OperatorT> userDefinedPredicate,
+ OperatorTranslator<?, ?, ? extends OperatorT> translator) {
+ return new TranslationDescriptor<>(
+ Optional.of(requireNonNull(operatorClass)),
+ Optional.of(requireNonNull(userDefinedPredicate)),
+ requireNonNull(translator));
+ }
+
+ static <OperatorT extends Operator<?>> TranslationDescriptor<OperatorT> of(
+ Predicate<OperatorT> userDefinedPredicate,
+ OperatorTranslator<?, ?, ? extends OperatorT> translator) {
+ return new TranslationDescriptor<>(
+ Optional.empty(), Optional.of(userDefinedPredicate),
requireNonNull(translator));
+ }
+
+ static <OperatorT extends Operator<?>> TranslationDescriptor<OperatorT> of(
+ Class<OperatorT> operatorClass, OperatorTranslator<?, ?, ? extends
OperatorT> translator) {
+ return new TranslationDescriptor<>(
+ Optional.of(requireNonNull(operatorClass)), Optional.empty(),
requireNonNull(translator));
+ }
+
+ private boolean checkTranslatorSuitableFor(OperatorT operator) {
+
+ // optional class equality check
+ if (operatorClass.isPresent() &&
!operatorClass.get().equals(operator.getClass())) {
+ return false;
+ }
+
+ // optional user-defined predicate check
+ if (userDefinedPredicate.isPresent() &&
!userDefinedPredicate.get().test(operator)) {
+ return false;
+ }
+
+ // mandatory check by translator itself
+ return translator.canTranslate(operator);
+ }
+
+ Optional<OperatorTranslator<?, ?, OperatorT>>
getTranslatorWhenSuitable(OperatorT operator) {
+ if (checkTranslatorSuitableFor(operator)) {
+ return Optional.of(translator);
+ } else {
+ return Optional.empty();
+ }
+ }
+ }
+
+ private final List<TranslationDescriptor> possibleTranslators;
+
+ private GenericTranslatorProvider(List<TranslationDescriptor>
possibleTranslators) {
+ this.possibleTranslators = possibleTranslators;
+ }
+
+ @Override
+ public <InputT, OutputT, OperatorT extends Operator<OutputT>>
+ Optional<OperatorTranslator<InputT, OutputT, OperatorT>>
findTranslator(OperatorT operator) {
+
+ for (TranslationDescriptor descriptor : possibleTranslators) {
+
+ @SuppressWarnings("unchecked")
+ Optional<OperatorTranslator<InputT, OutputT, OperatorT>> maybeTranslator
=
+ descriptor.getTranslatorWhenSuitable(operator);
+
+ if (maybeTranslator.isPresent()) {
+ return maybeTranslator;
+ }
+ }
+
+ return Optional.empty();
+ }
+}
diff --git
a/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/package-info.java
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/package-info.java
new file mode 100644
index 00000000000..d0e2b94c479
--- /dev/null
+++
b/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementations of {@link
+ * org.apache.beam.sdk.extensions.euphoria.core.translate.TranslatorProvider}.
+ */
+package org.apache.beam.sdk.extensions.euphoria.core.translate.provider;
diff --git
a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
index 84f31861b8b..8278c336772 100644
---
a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
+++
b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java
@@ -29,6 +29,7 @@
import
org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.CompositeOperator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.CountByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Distinct;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Filter;
@@ -43,8 +44,16 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.SumByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.TopPerKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Union;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Fold;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Triple;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.BroadcastHashJoinTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.CompositeOperatorTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.FlatMapTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.TranslatorProvider;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.provider.CompositeProvider;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.provider.GenericTranslatorProvider;
import org.apache.beam.sdk.extensions.kryo.KryoCoderProvider;
import org.apache.beam.sdk.extensions.kryo.KryoOptions;
import org.apache.beam.sdk.io.TextIO;
@@ -62,6 +71,7 @@
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
@@ -807,4 +817,51 @@ public void testTopPerKeyOperator() {
pipeline.run();
}
+
+ @Test
+ public void testGenericTranslatorProvider() {
+
+ GenericTranslatorProvider provider =
+ GenericTranslatorProvider.newBuilder()
+ .register(FlatMap.class, new FlatMapTranslator<>()) // register by
operator class
+ .register(
+ Join.class,
+ (Join op) -> {
+ String name = ((Optional<String>) op.getName()).orElse("");
+ return name.toLowerCase().startsWith("broadcast");
+ },
+ new BroadcastHashJoinTranslator<>()) // register by class and
predicate
+ .register(
+ op -> op instanceof CompositeOperator,
+ new CompositeOperatorTranslator<>()) // register by predicate
only
+ .build();
+
+ Assert.assertNotNull(provider);
+ }
+
+ private static class CustomTranslatorProvider implements TranslatorProvider {
+
+ static CustomTranslatorProvider of() {
+ return new CustomTranslatorProvider();
+ }
+
+ @Override
+ public <InputT, OutputT, OperatorT extends Operator<OutputT>>
+ Optional<OperatorTranslator<InputT, OutputT, OperatorT>>
findTranslator(
+ OperatorT operator) {
+ return Optional.empty();
+ }
+ }
+
+ @Test
+ public void testCompositeTranslationProviderExample() {
+
+ CompositeProvider compositeProvider =
+ CompositeProvider.of(
+ CustomTranslatorProvider.of(), // first ask
CustomTranslatorProvider for translator
+ GenericTranslatorProvider
+ .createWithDefaultTranslators()); // then ask default provider
if needed
+
+ Assert.assertNotNull(compositeProvider);
+ }
}
diff --git
a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/BroadcastHashJoinTest.java
b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/BroadcastHashJoinTest.java
index 6f19e7a7f0b..cd840a71fbf 100644
---
a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/BroadcastHashJoinTest.java
+++
b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/BroadcastHashJoinTest.java
@@ -22,15 +22,14 @@
import java.util.Optional;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.Collector;
-import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.LeftJoin;
import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin;
import
org.apache.beam.sdk.extensions.euphoria.core.translate.BroadcastHashJoinTranslator;
import org.apache.beam.sdk.extensions.euphoria.core.translate.EuphoriaOptions;
-import
org.apache.beam.sdk.extensions.euphoria.core.translate.FlatMapTranslator;
-import
org.apache.beam.sdk.extensions.euphoria.core.translate.SimpleTranslatorProvider;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.provider.CompositeProvider;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.provider.GenericTranslatorProvider;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -49,10 +48,18 @@
.getOptions()
.as(EuphoriaOptions.class)
.setTranslatorProvider(
- SimpleTranslatorProvider.newBuilder()
- .registerTranslator(FlatMap.class, new FlatMapTranslator<>())
- .registerTranslator(Join.class, new
BroadcastHashJoinTranslator<>())
- .build());
+ CompositeProvider.of(
+ GenericTranslatorProvider.newBuilder()
+ .register(
+ Join.class,
+ (Join op) -> {
+ String name = ((Optional<String>)
op.getName()).orElse("");
+ return name.toLowerCase().startsWith("broadcast");
+ },
+ new BroadcastHashJoinTranslator<>())
+ .build(),
+ GenericTranslatorProvider.createWithDefaultTranslators()));
+
return super.getOutput(pipeline);
}
}
@@ -65,7 +72,8 @@ public void leftBroadcastHashJoin() {
@Override
protected PCollection<KV<Integer, String>> getOutput(
PCollection<Integer> left, PCollection<Long> right) {
- return LeftJoin.of(left, MapElements.of(right).using(i ->
i).output())
+ return LeftJoin.named("broadcast-leftJoin")
+ .of(left, MapElements.of(right).using(i -> i).output())
.by(e -> e, e -> (int) (e % 10))
.using(
(Integer l, Optional<Long> r, Collector<String> c) ->
@@ -118,7 +126,8 @@ public void rightBroadcastHashJoin() {
@Override
protected PCollection<KV<Integer, String>> getOutput(
PCollection<Integer> left, PCollection<Long> right) {
- return RightJoin.of(MapElements.of(left).using(i -> i).output(),
right)
+ return RightJoin.named("BroadcastRightJoin")
+ .of(MapElements.of(left).using(i -> i).output(), right)
.by(e -> e, e -> (int) (e % 10))
.using(
(Optional<Integer> l, Long r, Collector<String> c) ->
@@ -171,7 +180,8 @@ public void keyHashCollisionBroadcastHashJoin() {
@Override
protected PCollection<KV<String, String>> getOutput(
PCollection<String> left, PCollection<Integer> right) {
- return LeftJoin.of(left, MapElements.of(right).using(i ->
i).output())
+ return LeftJoin.named("Broadcast-leftJoin")
+ .of(left, MapElements.of(right).using(i -> i).output())
.by(e -> e, e -> e % 2 == 0 ? sameHashCodeKey2 :
sameHashCodeKey1)
.using(
(String l, Optional<Integer> r, Collector<String> c) ->
diff --git
a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/CompositeProviderTest.java
b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/CompositeProviderTest.java
new file mode 100644
index 00000000000..31750510894
--- /dev/null
+++
b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/CompositeProviderTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.core.translate.provider;
+
+import java.util.Optional;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.TranslatorProvider;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.provider.ProviderTestUtils.TestOpTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.provider.ProviderTestUtils.TestOperator;
+import org.junit.Test;
+
+/** Units tests of {@link CompositeProvider}. */
+public class CompositeProviderTest {
+
+ private final TestOperator testOperator = new TestOperator("test-operator");
+
+ @Test
+ public void testOneComposite() {
+ String translatorId = "first";
+ CompositeProvider compositeProvider =
+ CompositeProvider.of(TestProvider.ofTranslator(translatorId));
+
+ Optional<OperatorTranslator<Void, Void, TestOperator>> maybeTranslator =
+ compositeProvider.findTranslator(testOperator);
+
+ ProviderTestUtils.assertTranslator(translatorId, maybeTranslator,
TestOpTranslator.class);
+ }
+
+ @Test
+ public void testCompositesOrder() {
+
+ String firstTranslatorId = "first";
+ String secondTranslatorId = "second";
+
+ CompositeProvider compositeProvider =
+ CompositeProvider.of(
+ TestProvider.ofTranslator(firstTranslatorId),
+ TestProvider.ofTranslator(secondTranslatorId));
+
+ Optional<OperatorTranslator<Void, Void, TestOperator>> maybeTranslator =
+ compositeProvider.findTranslator(testOperator);
+
+ ProviderTestUtils.assertTranslator(firstTranslatorId, maybeTranslator,
TestOpTranslator.class);
+ }
+
+ @Test
+ public void testProviderCannotTranslate() {
+
+ String firstTranslatorId = "first";
+ String secondTranslatorId = "second";
+
+ CompositeProvider compositeProvider =
+ CompositeProvider.of(
+ TestProvider.ofTtranslatorWhichCannotTranslate(firstTranslatorId),
+ TestProvider.ofTranslator(secondTranslatorId));
+
+ Optional<OperatorTranslator<Void, Void, TestOperator>> maybeTranslator =
+ compositeProvider.findTranslator(testOperator);
+
+ ProviderTestUtils.assertTranslator(secondTranslatorId, maybeTranslator,
TestOpTranslator.class);
+ }
+
+ private static class TestProvider implements TranslatorProvider {
+ private final OperatorTranslator<?, ?, ?> opTranslator;
+
+ private TestProvider(TestOpTranslator opTranslator) {
+ this.opTranslator = opTranslator;
+ }
+
+ static TestProvider ofTranslator(String translatorId) {
+ return new TestProvider(new TestOpTranslator(translatorId, true));
+ }
+
+ static TestProvider ofTtranslatorWhichCannotTranslate(String translatorId)
{
+ return new TestProvider(new TestOpTranslator(translatorId, false));
+ }
+
+ @Override
+ public <InputT, OutputT, OperatorT extends Operator<OutputT>>
+ Optional<OperatorTranslator<InputT, OutputT, OperatorT>>
findTranslator(
+ OperatorT operator) {
+ if (operator.getClass().equals(TestOperator.class)) {
+
+ @SuppressWarnings("unchecked")
+ OperatorTranslator<InputT, OutputT, OperatorT> opTranslator =
+ (OperatorTranslator<InputT, OutputT, OperatorT>) this.opTranslator;
+
+ return Optional.of(opTranslator);
+ }
+ return Optional.empty();
+ }
+ }
+}
diff --git
a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProviderTest.java
b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProviderTest.java
new file mode 100644
index 00000000000..69c74c8234c
--- /dev/null
+++
b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProviderTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.core.translate.provider;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.provider.ProviderTestUtils.AnyOpTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.provider.ProviderTestUtils.SecondTestOperator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.provider.ProviderTestUtils.TestOpTranslator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.provider.ProviderTestUtils.TestOperator;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Unit test of {@link GenericTranslatorProvider}. */
+public class GenericTranslatorProviderTest {
+
+ @Test
+ public void testBuild() {
+
+ GenericTranslatorProvider builded =
GenericTranslatorProvider.newBuilder().build();
+
+ Assert.assertNotNull(builded);
+ }
+
+ @Test
+ public void testClassToTranslatorRegistration() {
+
+ String translatorName = "translator";
+ GenericTranslatorProvider provider =
+ GenericTranslatorProvider.newBuilder()
+ .register(TestOperator.class,
TestOpTranslator.ofName(translatorName))
+ .build();
+ Optional<OperatorTranslator<Void, Void, TestOperator>> maybeTranslator =
+ provider.findTranslator(TestOperator.of());
+
+ ProviderTestUtils.assertTranslator(translatorName, maybeTranslator,
TestOpTranslator.class);
+ }
+
+ @Test
+ public void testClassWithPredicateToTranslatorRegistration() {
+
+ String translatorName = "translator";
+ GenericTranslatorProvider provider =
+ GenericTranslatorProvider.newBuilder()
+ .register(TestOperator.class, op -> true,
TestOpTranslator.ofName(translatorName))
+ .build();
+
+ Optional<OperatorTranslator<Void, Void, TestOperator>> maybeTranslator =
+ provider.findTranslator(TestOperator.of());
+
+ ProviderTestUtils.assertTranslator(translatorName, maybeTranslator,
TestOpTranslator.class);
+ }
+
+ @Test
+ public void testPredicateWithPredicateToTranslatorRegistration() {
+
+ String translatorName = "translator";
+ GenericTranslatorProvider provider =
+ GenericTranslatorProvider.newBuilder()
+ .register(op -> true, AnyOpTranslator.ofName(translatorName))
+ .build();
+
+ Optional<OperatorTranslator<Void, Void, TestOperator>> maybeTranslator =
+ provider.findTranslator(TestOperator.of());
+
+ ProviderTestUtils.assertTranslator(translatorName, maybeTranslator,
AnyOpTranslator.class);
+ }
+
+ @Test
+ public void testClassWithPredicateToTranslatorFunction() {
+
+ AtomicBoolean predicateEvalValue = new AtomicBoolean(false);
+
+ String translatorName = "translator";
+ GenericTranslatorProvider provider =
+ GenericTranslatorProvider.newBuilder()
+ .register(
+ TestOperator.class,
+ op -> predicateEvalValue.get(),
+ TestOpTranslator.ofName(translatorName))
+ .build();
+
+ Optional<OperatorTranslator<Void, Void, TestOperator>> maybeTranslator;
+ maybeTranslator = provider.findTranslator(TestOperator.of());
+
+ Assert.assertFalse(maybeTranslator.isPresent());
+
+ predicateEvalValue.set(true);
+ // now predicate will return true and we should get our translator
+ maybeTranslator = provider.findTranslator(TestOperator.of());
+ ProviderTestUtils.assertTranslator(translatorName, maybeTranslator,
TestOpTranslator.class);
+
+ // we should not obtain operator for different operator class
+ Optional<OperatorTranslator<Void, Void, SecondTestOperator>>
maybeSecondTranslator =
+ provider.findTranslator(SecondTestOperator.of());
+ Assert.assertFalse(maybeSecondTranslator.isPresent());
+ }
+
+ @Test
+ public void testPredicateToTranslatorFunction() {
+
+ AtomicBoolean predicateEvalValue = new AtomicBoolean(false);
+
+ String translatorName = "translator";
+ GenericTranslatorProvider provider =
+ GenericTranslatorProvider.newBuilder()
+ .register(op -> predicateEvalValue.get(),
AnyOpTranslator.ofName(translatorName))
+ .build();
+
+ Optional<OperatorTranslator<Void, Void, TestOperator>> maybeTranslator;
+ maybeTranslator = provider.findTranslator(TestOperator.of());
+ Assert.assertFalse(maybeTranslator.isPresent());
+
+ Optional<OperatorTranslator<Void, Void, SecondTestOperator>>
maybeSecondTranslator =
+ provider.findTranslator(SecondTestOperator.of());
+ Assert.assertFalse(maybeSecondTranslator.isPresent());
+
+ predicateEvalValue.set(true);
+ // now predicate will return true and we should get our translator
+ maybeTranslator = provider.findTranslator(TestOperator.of());
+ ProviderTestUtils.assertTranslator(translatorName, maybeTranslator,
AnyOpTranslator.class);
+
+ // we should get our translator for every operator's type
+ maybeSecondTranslator = provider.findTranslator(SecondTestOperator.of());
+ Assert.assertTrue(maybeSecondTranslator.isPresent());
+ }
+}
diff --git
a/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/ProviderTestUtils.java
b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/ProviderTestUtils.java
new file mode 100644
index 00000000000..d55c1b67f03
--- /dev/null
+++
b/sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/ProviderTestUtils.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.core.translate.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Optional;
+import javax.annotation.Nullable;
+import
org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
+import
org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTranslator;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TypeDescriptors;
+
+class ProviderTestUtils {
+
+ interface Named {
+ String getName();
+ }
+
+ /** Dummy {@link OperatorTranslator} used for testing purposes. */
+ static class TestOpTranslator implements OperatorTranslator<Void, Void,
TestOperator>, Named {
+ private final String name;
+ private final boolean canTranslate;
+
+ TestOpTranslator(String name, boolean canTranslate) {
+ this.name = name;
+ this.canTranslate = canTranslate;
+ }
+
+ static TestOpTranslator ofName(String name) {
+ return new TestOpTranslator(name, true);
+ }
+
+ static TestOpTranslator of(String name, boolean canTranslate) {
+ return new TestOpTranslator(name, canTranslate);
+ }
+
+ @Override
+ public PCollection<Void> translate(TestOperator operator,
PCollectionList<Void> inputs) {
+ throw new IllegalStateException("Not meant to actually translate
something.");
+ }
+
+ @Override
+ public boolean canTranslate(TestOperator operator) {
+ return canTranslate;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+ }
+
+ /** Dummy {@link OperatorTranslator} used for testing purposes. */
+ static class AnyOpTranslator implements OperatorTranslator<Void, Void,
Operator>, Named {
+ private final String name;
+ private final boolean canTranslate;
+
+ AnyOpTranslator(String name, boolean canTranslate) {
+ this.name = name;
+ this.canTranslate = canTranslate;
+ }
+
+ static AnyOpTranslator ofName(String name) {
+ return new AnyOpTranslator(name, true);
+ }
+
+ static AnyOpTranslator of(String name, boolean canTranslate) {
+ return new AnyOpTranslator(name, canTranslate);
+ }
+
+ @Override
+ public PCollection<Void> translate(Operator operator,
PCollectionList<Void> inputs) {
+ throw new IllegalStateException("Not meant to actually translate
something.");
+ }
+
+ @Override
+ public boolean canTranslate(Operator operator) {
+ return canTranslate;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+ }
+
+ /** Dummy {@link Operator} used for testing purposes. */
+ static class TestOperator extends Operator<Void> {
+
+ TestOperator(@Nullable String name) {
+ super(name, TypeDescriptors.voids());
+ }
+
+ static TestOperator of() {
+ return new TestOperator(null);
+ }
+ }
+
+ /** Dummy {@link Operator} used for testing purposes. */
+ static class SecondTestOperator extends Operator<Void> {
+
+ SecondTestOperator(@Nullable String name) {
+ super(name, TypeDescriptors.voids());
+ }
+
+ static SecondTestOperator of() {
+ return new SecondTestOperator(null);
+ }
+ }
+
+ static void assertTranslator(
+ String translatorId,
+ Optional<OperatorTranslator<Void, Void, TestOperator>> maybeTranslator,
+ Class<? extends OperatorTranslator> translatorClass) {
+ assertNotNull(maybeTranslator);
+ assertTrue(maybeTranslator.isPresent());
+ OperatorTranslator<Void, Void, TestOperator> translator =
maybeTranslator.get();
+ assertEquals(translator.getClass(), translatorClass);
+
+ Named namedTranslator = (Named) translator;
+ assertEquals(translatorId, namedTranslator.getName());
+ }
+}
diff --git a/website/src/documentation/sdks/euphoria.md
b/website/src/documentation/sdks/euphoria.md
index 7d948f8c25f..4212ec6eb80 100644
--- a/website/src/documentation/sdks/euphoria.md
+++ b/website/src/documentation/sdks/euphoria.md
@@ -303,7 +303,7 @@ PCollection<KV<Integer, String>> joined =
// KV(3, "3+rat"), KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(1, "1+X")]
```
-Euphoria support performance optimization called 'BroadcastHashJoin' for the
`LeftJoin`. User can indicate through previous operator's output hint
`.output(SizeHint.FITS_IN_MEMORY)` that output `PCollection` of that operator
fits in executors memory. And when the `PCollection` is used as right input,
Euphoria will automatically translated `LeftJoin` as 'BroadcastHashJoin'.
Broadcast join can be very efficient when joining between skewed datasets.
+Euphoria support performance optimization called 'BroadcastHashJoin' for the
`LeftJoin`. Broadcast join can be very efficient when joining two datasets
where one fits in memory (in `LeftJoin` right dataset has to fit in memory).
How to use 'Broadcast Hash Join' is described in [Translation](#Translation)
section.
### `RightJoin`
Represents right join of two (left and right) datasets on given key producing
single new dataset. Key is extracted from both datasets by separate extractors
so elements in left and right can have different types denoted as `LeftT` and
`RightT`. The join itself is performed by user-supplied `BinaryFunctor` which
consumes one element from both dataset, where left is present optionally,
sharing the same key. And outputs result of the join (`OutputT`). The operator
emits output dataset of `KV<K, OutputT>` type.
@@ -322,7 +322,7 @@ PCollection<KV<Integer, String>> joined =
// KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X"),
// KV(8, "null+elephant"), KV(5, "null+mouse")]
```
-Euphoria support performance optimization called 'Broadcast Hash Join' for the
`RightJoin`. User can indicate through previous operator's output hint
`.output(SizeHint.FITS_IN_MEMORY)` that output `PCollection` of that operator
fits in executors memory. And when the `PCollection` is used as left input,
Euphoria will automatically translated `RightJoin` as 'Broadcast Hash Join'.
Broadcast join can be very efficient when joining between skewed datasets.
+Euphoria support performance optimization called 'BroadcastHashJoin' for the
`RightJoin`. Broadcast join can be very efficient when joining two datasets
where one fits in memory (in `RightJoin` left dataset has to fit in memory).
How to use 'Broadcast Hash Join' is described in [Translation](#Translation)
section.
### `FullJoin`
Represents full outer join of two (left and right) datasets on given key
producing single new dataset. Key is extracted from both datasets by separate
extractors so elements in left and right can have different types denoted as
`LeftT` and `RightT`. The join itself is performed by user-supplied
`BinaryFunctor` which consumes one element from both dataset, where both are
present only optionally, sharing the same key. And outputs result of the join
(`OutputT`). The operator emits output dataset of `KV<K, OutputT>` type.
@@ -527,17 +527,69 @@ PCollection<SomeEventObject> timeStampedEvents =
//Euphoria will now know event time for each event
```
-## Euphoria To Beam Translation (advanced user section)
-Euphoria API is build on top of Beam Java SDK. The API is transparently
translated into Beam's `PTransforms` in background. Most of the translation
happens in `org.apache.beam.sdk.extensions.euphoria.core.translate` package.
Where the most interesting classes are:
+## Translation
+Euphoria API is build on top of Beam Java SDK. The API is transparently
translated into Beam's `PTransforms` in background.
+
+The fact that Euphoria API is translated to Beam Java SDK give us option to
fine tune the translation itself. Translation of an `Operator` is realized
through implementations of `OperatorTranslator`.
+Euphoria uses `TranslationProvider` to decide which translator should be used.
User of Euphoria API can supply its own `OperatorTranslator` through
`TranslationProvider` by extending `EuphoriaOptions`.
+Euphoria already contains some useful implementations.
+
+### TranslationProviders
+#### `GenericTranslatorProvider`
+General `TranslationProvider`. Allows for registration of `OperatorTranslator`
three different ways:
+* Registration of operator specific translator by operator class.
+* Registration operator specific translator by operator class and additional
user defined predicate.
+* Registration of general (not specific to one operator type) translator with
user defined predicate.
+Order of registration is important since `GenericTranslatorProvider` returns
first suitable translator.
+
+```java
+GenericTranslatorProvider.newBuilder()
+ .register(FlatMap.class, new FlatMapTranslator<>()) // register by operator
class
+ .register(
+ Join.class,
+ (Join op) -> {
+ String name = ((Optional<String>) op.getName()).orElse("");
+ return name.toLowerCase().startsWith("broadcast");
+ },
+ new BroadcastHashJoinTranslator<>()) // register by class and predicate
+ .register(
+ op -> op instanceof CompositeOperator,
+ new CompositeOperatorTranslator<>()) // register by predicate only
+ .build();
+```
+
+`GenericTranslatorProvider` is default provider, see
`GenericTranslatorProvider.createWithDefaultTranslators()`.
+
+#### `CompositeProvider`
+Implements chaining of `TranslationProvider`s in given order. That in turn
allows for composing user defined `TranslationProvider` with already supplied
by Euphoria API.
+
+```java
+CompositeProvider.of(
+ CustomTranslatorProvider.of(), // first ask CustomTranslatorProvider for
translator
+ GenericTranslatorProvider.createWithDefaultTranslators()); // then ask
default provider if needed
+```
+
+### Operator Translators
+Each `Operator` needs to be translated to Java Beam SDK. That is done by
implementations of `OperatorTranslator`. Euphoria API contains translator for
every `Operator` implementation supplied with it.
+Some operators may have an alternative translations suitable in some cases.
`Join` typically may have many implementations. We are describing only the most
interesting here.
+
+#### `BroadcastHashJoinTranslator`
+Is able to translate `LeftJoin` and `RightJoin` when whole dataset of one side
fits in memory of target executor. So it can be distributed using Beam's side
inputs. Resulting in better performance.
+
+#### `CompositeOperatorTranslator`
+Some operators are composite. Meaning that they are in fact wrapped chain of
other operators. `CompositeOperatorTranslator` ensures that they are decomposed
to elemental operators during translation process.
+
+### Details
+Most of the translation happens in
`org.apache.beam.sdk.extensions.euphoria.core.translate` package. Where the
most interesting classes are:
* `OperatorTranslator` - Interface which defining inner API of Euphoria to
Beam translation.
* `TranslatorProvider` - Way of supplying custom translators.
-* `OperatorTransform` - Which is governing actual translation and/or expansion
Euphoria's operators to Beam's `PTransform`
+* `OperatorTransform` - Is governing actual translation and/or expansion
Euphoria's operators to Beam's `PTransform`
* `EuphoriaOptions` - A `PipelineOptions`, allows for setting custom
`TranslatorProvider`.
-The package also contains implementation of `OperatorTranslator` for each
supported operator type (`JoinTranslator`, `FlatMapTranslator`,
`ReduceByKeyTranslator`). Not every operator needs to have translator of its
own. Some of them can be composed from other operators. That is why operators
may implement `CompositeOperator` which give them option to be exanded to set
of other Euphoria operators.
+The package also contains implementation of `OperatorTranslator` for each
supported operator type (`JoinTranslator`, `FlatMapTranslator`,
`ReduceByKeyTranslator`). Not every operator needs to have translator of its
own. Some of them can be composed from other operators. That is why operators
may implement `CompositeOperator` which give them option to be expanded to set
of other Euphoria operators.
The translation process was designed with flexibility in mind. We wanted to
allow different ways of translating higher-level Euphoria operators to Beam's
SDK's primitives. It allows for further performance optimizations based on user
choices or some knowledge about data obtained automatically.
-### Unsupported Features
+## Unsupported Features
[Original Euphoria](https://github.com/seznam/euphoria) contained some
features and operators not jet supported in Beam port. List of not yet
supported features follows:
* `ReduceByKey` in original Euphoria was allowed to sort output values (per
key). This is also not yet translatable into Beam, therefore not supported.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 165487)
Time Spent: 2h 10m (was: 2h)
> Refactor traslator providers
> -----------------------------
>
> Key: BEAM-6054
> URL: https://issues.apache.org/jira/browse/BEAM-6054
> Project: Beam
> Issue Type: Sub-task
> Components: dsl-euphoria
> Reporter: Vaclav Plajt
> Assignee: Vaclav Plajt
> Priority: Major
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> We need following functionality:
> 1. Way of registering translator providers for operator type with optional
> user-defined predicate
> 1. Way of registering translator providers for any operator based on
> predicate. (e.g `CompositeOperatorTranslator`)
> 1. Way of chaining translator providers
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)