[
https://issues.apache.org/jira/browse/BEAM-14053?focusedWorklogId=772951&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772951
]
ASF GitHub Bot logged work on BEAM-14053:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 20/May/22 18:15
Start Date: 20/May/22 18:15
Worklog Time Spent: 10m
Work Description: Lizzfox commented on code in PR #17150:
URL: https://github.com/apache/beam/pull/17150#discussion_r878425402
##########
sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.reflect.TypeToken;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import io.cdap.cdap.common.lang.InstantiatorFactory;
+import io.cdap.cdap.etl.api.SubmitterLifecycle;
+import io.cdap.cdap.etl.api.batch.BatchSink;
+import io.cdap.cdap.etl.api.batch.BatchSinkContext;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.api.batch.BatchSourceContext;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.reflect.Nullable;
+import org.apache.beam.sdk.io.cdap.context.BatchContextImpl;
+import org.apache.beam.sdk.io.cdap.context.BatchSinkContextImpl;
+import org.apache.beam.sdk.io.cdap.context.BatchSourceContextImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Class wrapper for a CDAP plugin. */
+@AutoValue
+@SuppressWarnings({
+ "rawtypes",
+ "unchecked",
+ "assignment.type.incompatible",
+ "UnstableApiUsage",
+ "initialization.fields.uninitialized"
+})
+public abstract class Plugin {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Plugin.class);
+
+ protected @Nullable PluginConfig pluginConfig;
+ protected @Nullable Configuration hadoopConfiguration;
+ protected @Nullable SubmitterLifecycle cdapPluginObj;
+
+ /** Gets the context of a plugin. */
+ public abstract BatchContextImpl getContext();
+
+ /** Gets the main class of a plugin. */
+ public abstract Class<?> getPluginClass();
+
+ /** Gets InputFormat or OutputFormat class for a plugin. */
+ public abstract Class<?> getFormatClass();
+
+ /** Gets InputFormatProvider or OutputFormatProvider class for a plugin. */
+ public abstract Class<?> getFormatProviderClass();
+
+ /** Sets a plugin config. */
+ public Plugin withConfig(PluginConfig pluginConfig) {
+ this.pluginConfig = pluginConfig;
+ return this;
+ }
+
+ /** Gets a plugin config. */
+ public PluginConfig getPluginConfig() {
+ return pluginConfig;
+ }
+
+ /**
+ * Calls {@link SubmitterLifecycle#prepareRun(Object)} method on the {@link
#cdapPluginObj}
+ * passing needed {@param config} configuration object as a parameter. This
method is needed for
+ * validating connection to the CDAP sink/source and performing initial
tuning.
+ */
+ public void prepareRun() {
+ if (cdapPluginObj == null) {
+ InstantiatorFactory instantiatorFactory = new InstantiatorFactory(false);
+ cdapPluginObj =
+ (SubmitterLifecycle)
instantiatorFactory.get(TypeToken.of(getPluginClass())).create();
+ }
+ try {
+ cdapPluginObj.prepareRun(getContext());
+ for (Map.Entry<String, String> entry :
+
getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet())
{
+ getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+ }
+ } catch (Exception e) {
+ LOG.error("Error while prepareRun", e);
+ throw new IllegalStateException("Error while prepareRun");
+ }
+ }
+
+ /** Sets a plugin Hadoop configuration. */
+ public Plugin withHadoopConfiguration(Class<?> formatKeyClass, Class<?>
formatValueClass) {
+ PluginConstants.Format formatType = getFormatType();
+ PluginConstants.Hadoop hadoopType = getHadoopType();
+
+ getHadoopConfiguration()
+ .setClass(hadoopType.getFormatClass(), getFormatClass(),
formatType.getFormatClass());
+ getHadoopConfiguration().setClass(hadoopType.getKeyClass(),
formatKeyClass, Object.class);
+ getHadoopConfiguration().setClass(hadoopType.getValueClass(),
formatValueClass, Object.class);
+
+ return this;
+ }
+
+ /** Sets a plugin Hadoop configuration. */
+ public Plugin withHadoopConfiguration(Configuration hadoopConfiguration) {
+ this.hadoopConfiguration = hadoopConfiguration;
+
+ return this;
+ }
+
+ /** Gets a plugin Hadoop configuration. */
+ public Configuration getHadoopConfiguration() {
+ if (hadoopConfiguration == null) {
+ hadoopConfiguration = new Configuration(false);
+ }
+ return hadoopConfiguration;
+ }
+
+ /** Gets a plugin type. */
+ public abstract PluginConstants.PluginType getPluginType();
+
+ /** Gets a format type. */
+ private PluginConstants.Format getFormatType() {
+ return getPluginType() == PluginConstants.PluginType.SOURCE
+ ? PluginConstants.Format.INPUT
+ : PluginConstants.Format.OUTPUT;
+ }
+
+ /** Gets a Hadoop type. */
+ private PluginConstants.Hadoop getHadoopType() {
+ return getPluginType() == PluginConstants.PluginType.SOURCE
+ ? PluginConstants.Hadoop.SOURCE
+ : PluginConstants.Hadoop.SINK;
+ }
+
+ /** Gets value of a plugin type. */
+ public static PluginConstants.PluginType initPluginType(Class<?> pluginClass)
+ throws IllegalArgumentException {
+ if (BatchSource.class.isAssignableFrom(pluginClass)) {
+ return PluginConstants.PluginType.SOURCE;
+ } else if (BatchSink.class.isAssignableFrom(pluginClass)) {
+ return PluginConstants.PluginType.SINK;
+ } else {
+ throw new IllegalArgumentException("Provided class should be source or
sink plugin");
+ }
+ }
+
+ public static BatchContextImpl initContext(Class<?> cdapPluginClass) {
+ // Init context and determine input or output
+ Class<?> contextClass = null;
+ List<Method> methods = new
ArrayList<>(Arrays.asList(cdapPluginClass.getDeclaredMethods()));
+ if (cdapPluginClass.getSuperclass() != null) {
+
methods.addAll(Arrays.asList(cdapPluginClass.getSuperclass().getDeclaredMethods()));
+ }
+ for (Method method : methods) {
+ if (method.getName().equals("prepareRun")) {
+ contextClass = method.getParameterTypes()[0];
+ }
+ }
+ if (contextClass == null) {
+ throw new IllegalStateException("Cannot determine context class");
+ }
+
+ if (contextClass.equals(BatchSourceContext.class)) {
+ return new BatchSourceContextImpl();
+ } else if (contextClass.equals(BatchSinkContext.class)) {
+ return new BatchSinkContextImpl();
+ } else {
+ return new BatchSourceContextImpl();
+ }
+ }
+
+ /** Gets value of a plugin type. */
+ public Boolean isUnbounded() {
+ Boolean isUnbounded = null;
+
+ for (Annotation annotation : getPluginClass().getDeclaredAnnotations()) {
+ if
(annotation.annotationType().equals(io.cdap.cdap.api.annotation.Plugin.class)) {
Review Comment:
Thank you, that's helpful. Done
Issue Time Tracking
-------------------
Worklog Id: (was: 772951)
Time Spent: 1h 50m (was: 1h 40m)
> [CdapIO] Design and implement generic plugin class
> ---------------------------------------------------
>
> Key: BEAM-14053
> URL: https://issues.apache.org/jira/browse/BEAM-14053
> Project: Beam
> Issue Type: Task
> Components: io-java-cdap
> Reporter: Elizaveta Lomteva
> Assignee: Ekaterina Tatanova
> Priority: P2
> Labels: cdap-io-sprint-2
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> h3. Context:
> CDAP plugins include a bunch of classes to ensure the execution flow, such as
> Input/Output-FormatProvider, Input/Output-Format (ex.
> [SalesforceInputFormat|https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceInputFormat.java],
>
> [SalesforsceInputFormatProvider|https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceInputFormatProvider.java]).
> The goal is to allow the developer of the Beam connector (in {{cdapio}}
> package) to use a construct similar to
> {{cdapio.<pluginName>.withConfig(config).build()}} (pseudocode), which would
> return a plugin wrapper with read or write methods.
> h3. Task Description:
> Required to design wrapper class(es) or interface(s) for the CDAP plugin so
> that those classes (interfaces) encapsulate the functionality of the CDAP
> InputFormat and InputFormatProvider classes.
> h3. Acceptance criteria:
> Design and source code of class(s)/interface(s) that will provide CDAP plugin
> classes (ex.
> [SalesforsceInputFormatProvider|https://github.com/data-integrations/salesforce/blob/develop/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceInputFormatProvider.java])
> functionality.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)