[ 
https://issues.apache.org/jira/browse/BEAM-6054?focusedWorklogId=165435&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165435
 ]

ASF GitHub Bot logged work on BEAM-6054:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Nov/18 13:52
            Start Date: 13/Nov/18 13:52
    Worklog Time Spent: 10m 
      Work Description: VaclavPlajt commented on a change in pull request 
#7019: [BEAM-6054] Euphoria translation providers refactored.
URL: https://github.com/apache/beam/pull/7019#discussion_r233044884
 
 

 ##########
 File path: 
sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/translate/provider/GenericTranslatorProvider.java
 ##########
 @@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.euphoria.core.translate.provider;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import 
org.apache.beam.sdk.extensions.euphoria.core.client.operator.CompositeOperator;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
+import 
org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
+import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Union;
+import 
org.apache.beam.sdk.extensions.euphoria.core.client.operator.base.Operator;
+import 
org.apache.beam.sdk.extensions.euphoria.core.translate.CompositeOperatorTranslator;
+import 
org.apache.beam.sdk.extensions.euphoria.core.translate.FlatMapTranslator;
+import org.apache.beam.sdk.extensions.euphoria.core.translate.JoinTranslator;
+import 
org.apache.beam.sdk.extensions.euphoria.core.translate.OperatorTranslator;
+import 
org.apache.beam.sdk.extensions.euphoria.core.translate.ReduceByKeyTranslator;
+import 
org.apache.beam.sdk.extensions.euphoria.core.translate.TranslatorProvider;
+import org.apache.beam.sdk.extensions.euphoria.core.translate.UnionTranslator;
+
+/**
+ * Adjustable {@link TranslatorProvider} that selects first suitable 
translation for the registered
+ * operator.
+ *
+ * <p>{@link OperatorTranslator Translators} can be added by calling variants 
of {@link
+ * GenericTranslatorProvider.Builder#register(Class, OperatorTranslator) 
register} method during
+ * build. Order of registration is important. Building is started by {@link 
#newBuilder()}.
+ */
+public class GenericTranslatorProvider implements TranslatorProvider {
+
+  private final List<TranslationDescriptor> possibleTranslators;
+
+  private GenericTranslatorProvider(List<TranslationDescriptor> 
possibleTranslators) {
+    this.possibleTranslators = possibleTranslators;
+  }
+
+  public static GenericTranslatorProvider createWithDefaultTranslators() {
+    return GenericTranslatorProvider.newBuilder()
+        .register(FlatMap.class, new FlatMapTranslator<>())
+        .register(Union.class, new UnionTranslator<>())
+        .register(ReduceByKey.class, new ReduceByKeyTranslator<>())
+        .register(Join.class, new JoinTranslator<>())
+        // register fallback operator translator to decompose composit 
operators
+        .register(op -> op instanceof CompositeOperator, new 
CompositeOperatorTranslator<>())
+        .build();
+  }
+
+  @Override
+  public <InputT, OutputT, OperatorT extends Operator<OutputT>>
+      Optional<OperatorTranslator<InputT, OutputT, OperatorT>> 
findTranslator(OperatorT operator) {
+
+    for (TranslationDescriptor descriptor : possibleTranslators) {
+
+      @SuppressWarnings("unchecked")
+      Optional<OperatorTranslator<InputT, OutputT, OperatorT>> maybeTranslator 
=
+          descriptor.getTranslatorWhenSuitable(operator);
+
+      if (maybeTranslator.isPresent()) {
+        return maybeTranslator;
+      }
+    }
+
+    return Optional.empty();
+  }
+
+  /**
+   * Create a new builder for provider.
+   *
+   * @return builder
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /** {@link GenericTranslatorProvider} builder. */
+  public static class Builder {
+
+    private final List<TranslationDescriptor> possibleTranslators = new 
ArrayList<>();
+
+    private Builder() {}
+
+    /**
+     * Registers given {@link OperatorTranslator} to be used for given 
operator type.
+     *
+     * @param clazz class of an {@link Operator} to be translated
+     * @param operatorTranslator translator to register
+     * @param <OperatorT> type of an {@link Operator} to be translated
+     * @return this builder
+     */
+    public <OperatorT extends Operator<?>> Builder register(
+        Class<OperatorT> clazz, OperatorTranslator<?, ?, ? extends OperatorT> 
operatorTranslator) {
+      possibleTranslators.add(TranslationDescriptor.of(clazz, 
operatorTranslator));
+      return this;
+    }
+
+    /**
+     * Register given {@link OperatorTranslator} to be used for operator type 
when given {@link
+     * Predicate} holds.
+     *
+     * @param clazz class of an {@link Operator} to be translated
+     * @param predicate user defined predicate which is tested to be true in 
order to apply
+     *     translator to an {@link Operator}
+     * @param operatorTranslator translator to register
+     * @param <OperatorT> type of an {@link Operator} to be translated
+     * @return this builder
+     */
+    public <OperatorT extends Operator<?>> Builder register(
+        Class<OperatorT> clazz,
+        Predicate<OperatorT> predicate,
+        OperatorTranslator<?, ?, ? extends OperatorT> operatorTranslator) {
+      possibleTranslators.add(TranslationDescriptor.of(clazz, predicate, 
operatorTranslator));
+      return this;
+    }
+
+    /**
+     * Registers given {@link OperatorTranslator} to be used for any operator 
when given {@link
+     * Predicate} holds.
+     *
+     * @param predicate user defined predicate which is tested to be true in 
order to apply
+     *     translator to an {@link Operator}. Note that predicate have to be 
able to test any {@link
+     *     Operator} instance including subtypes.
+     * @param operatorTranslator translator to register
+     * @return this builder
+     */
+    public Builder register(
+        Predicate<Operator> predicate, OperatorTranslator<?, ?, Operator> 
operatorTranslator) {
+      possibleTranslators.add(TranslationDescriptor.of(predicate, 
operatorTranslator));
+      return this;
+    }
+
+    public GenericTranslatorProvider build() {
+      return new GenericTranslatorProvider(possibleTranslators);
+    }
+  }
+
+  /**
+   * Container for optional {@link Predicate user defined predicate}, optional 
{@link Class} of na
+   * operator to be translated and {@link OperatorTranslator} itself. The 
predicate and operator
+   * class defines an optional checks. At least one of them have to be present.
+   *
+   * <p>The {@link OperatorTranslator} is allowed to translate an operator iff 
it pass all the
+   * checks and {@link OperatorTranslator#canTranslate(Operator) can 
translate} given operator. That
+   * allows users to write translators specific for any operator.
+   *
+   * @param <OperatorT> the type of the euphoria operator
+   */
+  private static class TranslationDescriptor<OperatorT extends Operator<?>> {
+
+    /** Class of an {@link Operator} given {@link #translator} can be applied 
on. */
+    private final Optional<Class<OperatorT>> operatorClass;
+
+    /**
+     * User specified predicate, which determines whenever given {@link 
#translator} can be used to
+     * translate an operator.
+     */
+    private final Optional<Predicate<OperatorT>> userDefinedPredicate;
+
+    private final OperatorTranslator<?, ?, OperatorT> translator;
+
+    private TranslationDescriptor(
+        Optional<Class<OperatorT>> operatorClass,
+        Optional<Predicate<OperatorT>> userDefinedPredicate,
+        OperatorTranslator<?, ?, ? extends OperatorT> translator) {
+      Preconditions.checkState(
+          operatorClass.isPresent() || userDefinedPredicate.isPresent(),
+          "At least user defined predicate or class of an operator have to be 
given.");
+
+      @SuppressWarnings("unchecked")
+      OperatorTranslator<?, ?, OperatorT> castedTranslator =
+          (OperatorTranslator<?, ?, OperatorT>) translator;
+
+      this.operatorClass = operatorClass;
+      this.userDefinedPredicate = userDefinedPredicate;
+      this.translator = castedTranslator;
+    }
+
+    static <OperatorT extends Operator<?>> TranslationDescriptor<OperatorT> of(
+        Class<OperatorT> operatorClass,
+        Predicate<OperatorT> userDefinedPredicate,
+        OperatorTranslator<?, ?, ? extends OperatorT> translator) {
+      return new TranslationDescriptor<>(
+          Optional.of(requireNonNull(operatorClass)),
+          Optional.of(requireNonNull(userDefinedPredicate)),
+          requireNonNull(translator));
+    }
+
+    static <OperatorT extends Operator<?>> TranslationDescriptor<OperatorT> of(
+        Predicate<OperatorT> userDefinedPredicate,
+        OperatorTranslator<?, ?, ? extends OperatorT> translator) {
+      return new TranslationDescriptor<>(
+          Optional.empty(),
+          Optional.of(requireNonNull(userDefinedPredicate)),
 
 Review comment:
   fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 165435)
    Time Spent: 1h 50m  (was: 1h 40m)

> Refactor traslator providers 
> -----------------------------
>
>                 Key: BEAM-6054
>                 URL: https://issues.apache.org/jira/browse/BEAM-6054
>             Project: Beam
>          Issue Type: Sub-task
>          Components: dsl-euphoria
>            Reporter: Vaclav Plajt
>            Assignee: Vaclav Plajt
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> We need following functionality:
> 1. Way of registering translator providers for operator type with optional 
> user-defined predicate
> 1. Way of registering translator providers for any operator based on 
> predicate. (e.g `CompositeOperatorTranslator`)
> 1. Way of chaining translator providers



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to