[ 
https://issues.apache.org/jira/browse/BEAM-14053?focusedWorklogId=772951&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772951
 ]

ASF GitHub Bot logged work on BEAM-14053:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/May/22 18:15
            Start Date: 20/May/22 18:15
    Worklog Time Spent: 10m 
      Work Description: Lizzfox commented on code in PR #17150:
URL: https://github.com/apache/beam/pull/17150#discussion_r878425402


##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.reflect.TypeToken;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import io.cdap.cdap.common.lang.InstantiatorFactory;
+import io.cdap.cdap.etl.api.SubmitterLifecycle;
+import io.cdap.cdap.etl.api.batch.BatchSink;
+import io.cdap.cdap.etl.api.batch.BatchSinkContext;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.api.batch.BatchSourceContext;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.reflect.Nullable;
+import org.apache.beam.sdk.io.cdap.context.BatchContextImpl;
+import org.apache.beam.sdk.io.cdap.context.BatchSinkContextImpl;
+import org.apache.beam.sdk.io.cdap.context.BatchSourceContextImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Class wrapper for a CDAP plugin. */
+@AutoValue
+@SuppressWarnings({
+  "rawtypes",
+  "unchecked",
+  "assignment.type.incompatible",
+  "UnstableApiUsage",
+  "initialization.fields.uninitialized"
+})
+public abstract class Plugin {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Plugin.class);
+
+  protected @Nullable PluginConfig pluginConfig;
+  protected @Nullable Configuration hadoopConfiguration;
+  protected @Nullable SubmitterLifecycle cdapPluginObj;
+
+  /** Gets the context of a plugin. */
+  public abstract BatchContextImpl getContext();
+
+  /** Gets the main class of a plugin. */
+  public abstract Class<?> getPluginClass();
+
+  /** Gets InputFormat or OutputFormat class for a plugin. */
+  public abstract Class<?> getFormatClass();
+
+  /** Gets InputFormatProvider or OutputFormatProvider class for a plugin. */
+  public abstract Class<?> getFormatProviderClass();
+
+  /** Sets a plugin config. */
+  public Plugin withConfig(PluginConfig pluginConfig) {
+    this.pluginConfig = pluginConfig;
+    return this;
+  }
+
+  /** Gets a plugin config. */
+  public PluginConfig getPluginConfig() {
+    return pluginConfig;
+  }
+
+  /**
+   * Calls {@link SubmitterLifecycle#prepareRun(Object)} method on the {@link 
#cdapPluginObj}
+   * passing needed {@param config} configuration object as a parameter. This 
method is needed for
+   * validating connection to the CDAP sink/source and performing initial 
tuning.
+   */
+  public void prepareRun() {
+    if (cdapPluginObj == null) {
+      InstantiatorFactory instantiatorFactory = new InstantiatorFactory(false);
+      cdapPluginObj =
+          (SubmitterLifecycle) 
instantiatorFactory.get(TypeToken.of(getPluginClass())).create();
+    }
+    try {
+      cdapPluginObj.prepareRun(getContext());
+      for (Map.Entry<String, String> entry :
+          
getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet()) 
{
+        getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+      }
+    } catch (Exception e) {
+      LOG.error("Error while prepareRun", e);
+      throw new IllegalStateException("Error while prepareRun");
+    }
+  }
+
+  /** Sets a plugin Hadoop configuration. */
+  public Plugin withHadoopConfiguration(Class<?> formatKeyClass, Class<?> 
formatValueClass) {
+    PluginConstants.Format formatType = getFormatType();
+    PluginConstants.Hadoop hadoopType = getHadoopType();
+
+    getHadoopConfiguration()
+        .setClass(hadoopType.getFormatClass(), getFormatClass(), 
formatType.getFormatClass());
+    getHadoopConfiguration().setClass(hadoopType.getKeyClass(), 
formatKeyClass, Object.class);
+    getHadoopConfiguration().setClass(hadoopType.getValueClass(), 
formatValueClass, Object.class);
+
+    return this;
+  }
+
+  /** Sets a plugin Hadoop configuration. */
+  public Plugin withHadoopConfiguration(Configuration hadoopConfiguration) {
+    this.hadoopConfiguration = hadoopConfiguration;
+
+    return this;
+  }
+
+  /** Gets a plugin Hadoop configuration. */
+  public Configuration getHadoopConfiguration() {
+    if (hadoopConfiguration == null) {
+      hadoopConfiguration = new Configuration(false);
+    }
+    return hadoopConfiguration;
+  }
+
+  /** Gets a plugin type. */
+  public abstract PluginConstants.PluginType getPluginType();
+
+  /** Gets a format type. */
+  private PluginConstants.Format getFormatType() {
+    return getPluginType() == PluginConstants.PluginType.SOURCE
+        ? PluginConstants.Format.INPUT
+        : PluginConstants.Format.OUTPUT;
+  }
+
+  /** Gets a Hadoop type. */
+  private PluginConstants.Hadoop getHadoopType() {
+    return getPluginType() == PluginConstants.PluginType.SOURCE
+        ? PluginConstants.Hadoop.SOURCE
+        : PluginConstants.Hadoop.SINK;
+  }
+
+  /** Gets value of a plugin type. */
+  public static PluginConstants.PluginType initPluginType(Class<?> pluginClass)
+      throws IllegalArgumentException {
+    if (BatchSource.class.isAssignableFrom(pluginClass)) {
+      return PluginConstants.PluginType.SOURCE;
+    } else if (BatchSink.class.isAssignableFrom(pluginClass)) {
+      return PluginConstants.PluginType.SINK;
+    } else {
+      throw new IllegalArgumentException("Provided class should be source or 
sink plugin");
+    }
+  }
+
+  public static BatchContextImpl initContext(Class<?> cdapPluginClass) {
+    // Init context and determine input or output
+    Class<?> contextClass = null;
+    List<Method> methods = new 
ArrayList<>(Arrays.asList(cdapPluginClass.getDeclaredMethods()));
+    if (cdapPluginClass.getSuperclass() != null) {
+      
methods.addAll(Arrays.asList(cdapPluginClass.getSuperclass().getDeclaredMethods()));
+    }
+    for (Method method : methods) {
+      if (method.getName().equals("prepareRun")) {
+        contextClass = method.getParameterTypes()[0];
+      }
+    }
+    if (contextClass == null) {
+      throw new IllegalStateException("Cannot determine context class");
+    }
+
+    if (contextClass.equals(BatchSourceContext.class)) {
+      return new BatchSourceContextImpl();
+    } else if (contextClass.equals(BatchSinkContext.class)) {
+      return new BatchSinkContextImpl();
+    } else {
+      return new BatchSourceContextImpl();
+    }
+  }
+
+  /** Gets value of a plugin type. */
+  public Boolean isUnbounded() {
+    Boolean isUnbounded = null;
+
+    for (Annotation annotation : getPluginClass().getDeclaredAnnotations()) {
+      if 
(annotation.annotationType().equals(io.cdap.cdap.api.annotation.Plugin.class)) {

Review Comment:
   Thank you, that's helpful. Done





Issue Time Tracking
-------------------

    Worklog Id:     (was: 772951)
    Time Spent: 1h 50m  (was: 1h 40m)

> [CdapIO] Design and implement generic plugin class 
> ---------------------------------------------------
>
>                 Key: BEAM-14053
>                 URL: https://issues.apache.org/jira/browse/BEAM-14053
>             Project: Beam
>          Issue Type: Task
>          Components: io-java-cdap
>            Reporter: Elizaveta Lomteva
>            Assignee: Ekaterina Tatanova
>            Priority: P2
>              Labels: cdap-io-sprint-2
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> h3. Context:
> CDAP plugins include a bunch of classes to ensure the execution flow, such as 
> Input/Output-FormatProvider, Input/Output-Format (ex. 
> [SalesforceInputFormat|https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceInputFormat.java],
>  
> [SalesforsceInputFormatProvider|https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceInputFormatProvider.java]).
>  The goal is to allow the developer of the Beam connector (in {{cdapio}} 
> package) to use a construct similar to 
> {{cdapio.<pluginName>.withConfig(config).build()}} (pseudocode), which would 
> return a plugin wrapper with read or write methods.
> h3. Task Description:
> Required to design wrapper class(es) or interface(s) for the CDAP plugin so 
> that those classes (interfaces) encapsulate the functionality of the CDAP 
> InputFormat and InputFormatProvider classes.
> h3. Acceptance criteria:
> Design and source code of class(s)/interface(s) that will provide CDAP plugin 
> classes (ex. 
> [SalesforsceInputFormatProvider|https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceInputFormatProvider.java])
>  functionality.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to