This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4886bdf08fd [CdapIO] Complete examples for CDAP Zendesk plugins 
(#24589)
4886bdf08fd is described below

commit 4886bdf08fd057a0f8a4e99ab30453089654188d
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Fri Dec 16 01:59:58 2022 +0400

    [CdapIO] Complete examples for CDAP Zendesk plugins (#24589)
    
    * Add examples for Cdap Zendesk plugins
    
    * Move common classes to Examples Cdap module
    
    * Fix readme
---
 examples/java/cdap/README.md                       |   2 +-
 examples/java/cdap/zendesk/build.gradle            | 106 ++++++++++++++++
 .../complete/cdap/zendesk/CdapZendeskToTxt.java    | 140 +++++++++++++++++++++
 .../beam/examples/complete/cdap/zendesk/README.md  |  55 ++++++++
 .../cdap/zendesk/options/CdapZendeskOptions.java   |  99 +++++++++++++++
 .../cdap/zendesk/options/package-info.java         |  20 +++
 .../complete/cdap/zendesk/package-info.java        |  20 +++
 .../zendesk/transforms/FormatInputTransform.java   |  54 ++++++++
 .../cdap/zendesk/transforms/package-info.java      |  20 +++
 .../utils/PluginConfigOptionsConverter.java        |  51 ++++++++
 .../complete/cdap/zendesk/utils/package-info.java  |  20 +++
 settings.gradle.kts                                |   1 +
 12 files changed, 587 insertions(+), 1 deletion(-)

diff --git a/examples/java/cdap/README.md b/examples/java/cdap/README.md
index 6cf86335cc7..c6ffc858d88 100644
--- a/examples/java/cdap/README.md
+++ b/examples/java/cdap/README.md
@@ -24,4 +24,4 @@ Supported CDAP plugins:
 - [ServiceNow](https://github.com/data-integrations/servicenow-plugins). More 
info in the ServiceNow example 
[README](servicenow/src/main/java/org/apache/beam/examples/complete/cdap/servicenow/README.md).
 - [Salesforce](https://github.com/data-integrations/salesforce)
 - [Hubspot](https://github.com/data-integrations/hubspot)
-- [Zendesk](https://github.com/data-integrations/zendesk)
+- [Zendesk](https://github.com/data-integrations/zendesk). More info in the 
ServiceNow example 
[README](zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/README.md).
diff --git a/examples/java/cdap/zendesk/build.gradle 
b/examples/java/cdap/zendesk/build.gradle
new file mode 100644
index 00000000000..277d0761fc3
--- /dev/null
+++ b/examples/java/cdap/zendesk/build.gradle
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import groovy.json.JsonOutput
+
+plugins {
+    id 'java'
+    id 'org.apache.beam.module'
+    id 'com.github.johnrengelman.shadow'
+}
+
+applyJavaNature(
+        exportJavadoc: false,
+        automaticModuleName: 'org.apache.beam.examples.complete.cdap.zendesk',
+)
+
+description = "Apache Beam :: Examples :: Java :: CDAP :: Zendesk"
+ext.summary = """Apache Beam SDK provides a simple, Java-based
+interface for processing virtually any size data. This
+artifact includes CDAP Zendesk Apache Beam Java SDK examples."""
+
+/** Define the list of runners which execute a precommit test.
+ * Some runners are run from separate projects, see the preCommit task below
+ * for details.
+ */
+def preCommitRunners = ["directRunner", "flinkRunner"]
+for (String runner : preCommitRunners) {
+    configurations.create(runner + "PreCommit")
+}
+
+dependencies {
+    implementation 
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
+    implementation project(path: ":sdks:java:core", configuration: "shadow")
+    implementation project(":examples:java:cdap")
+    implementation project(":sdks:java:io:cdap")
+    implementation project(":sdks:java:io:hadoop-common")
+    implementation library.java.cdap_api
+    implementation library.java.cdap_api_commons
+    implementation library.java.cdap_etl_api
+    permitUnusedDeclared library.java.cdap_etl_api
+    implementation library.java.cdap_hydrator_common
+    implementation library.java.cdap_plugin_zendesk
+    implementation library.java.hadoop_common
+    implementation library.java.slf4j_api
+    implementation library.java.vendored_guava_26_0_jre
+    runtimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+
+    // Add dependencies for the PreCommit configurations
+    // For each runner a project level dependency on the examples project.
+    for (String runner : preCommitRunners) {
+        delegate.add(runner + "PreCommit", 
project(":examples:java:cdap:zendesk"))
+        delegate.add(runner + "PreCommit", project(path: 
":examples:java:cdap:zendesk", configuration: "testRuntimeMigration"))
+    }
+    directRunnerPreCommit project(path: ":runners:direct-java", configuration: 
"shadow")
+    flinkRunnerPreCommit 
project(":runners:flink:${project.ext.latestFlinkVersion}")
+}
+
+/*
+ * Create a ${runner}PreCommit task for each runner which runs a set
+ * of integration tests for WordCount and WindowedWordCount.
+ */
+def preCommitRunnerClass = [
+        directRunner: "org.apache.beam.runners.direct.DirectRunner",
+        flinkRunner: "org.apache.beam.runners.flink.TestFlinkRunner"
+]
+
+for (String runner : preCommitRunners) {
+    tasks.create(name: runner + "PreCommit", type: Test) {
+        def preCommitBeamTestPipelineOptions = [
+                "--runner=" + preCommitRunnerClass[runner],
+        ]
+        classpath = configurations."${runner}PreCommit"
+        forkEvery 1
+        maxParallelForks 4
+        systemProperty "beamTestPipelineOptions", 
JsonOutput.toJson(preCommitBeamTestPipelineOptions)
+    }
+}
+
+/* Define a common precommit task which depends on all the individual 
precommits. */
+task preCommit() {
+    for (String runner : preCommitRunners) {
+        dependsOn runner + "PreCommit"
+    }
+}
+
+task executeCdap (type:JavaExec) {
+    mainClass = System.getProperty("mainClass")
+    classpath = sourceSets.main.runtimeClasspath
+    systemProperties System.getProperties()
+    args System.getProperty("exec.args", "").split()
+}
diff --git 
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/CdapZendeskToTxt.java
 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/CdapZendeskToTxt.java
new file mode 100644
index 00000000000..ce8f41e3e91
--- /dev/null
+++ 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/CdapZendeskToTxt.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.cdap.zendesk;
+
+import static 
org.apache.beam.examples.complete.cdap.zendesk.transforms.FormatInputTransform.readFromCdapZendesk;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import java.util.Map;
+import org.apache.beam.examples.complete.cdap.utils.StructuredRecordUtils;
+import 
org.apache.beam.examples.complete.cdap.zendesk.options.CdapZendeskOptions;
+import 
org.apache.beam.examples.complete.cdap.zendesk.utils.PluginConfigOptionsConverter;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.hadoop.WritableCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.MapValues;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.hadoop.io.NullWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link CdapZendeskToTxt} pipeline is a batch pipeline which ingests 
data in JSON format from
+ * CDAP Zendesk, and outputs the resulting records to .txt file. Zendesk 
parameters and output txt
+ * file path are specified by the user as template parameters. <br>
+ *
+ * <p><b>Example Usage</b>
+ *
+ * <pre>
+ * # Gradle preparation
+ *
+ * To run this example your {@code build.gradle} file should contain the 
following task
+ * to execute the pipeline:
+ * {@code
+ * task executeCdap (type:JavaExec) {
+ *     mainClass = System.getProperty("mainClass")
+ *     classpath = sourceSets.main.runtimeClasspath
+ *     systemProperties System.getProperties()
+ *     args System.getProperty("exec.args", "").split()
+ * }
+ * }
+ *
+ * This task allows to run the pipeline via the following command:
+ * {@code
+ * gradle clean executeCdap 
-DmainClass=org.apache.beam.examples.complete.cdap.zendesk.CdapZendeskToTxt \
+ *      -Dexec.args="--<argument>=<value> --<argument>=<value>"
+ * }
+ *
+ * # Running the pipeline
+ * To execute this pipeline, specify the parameters in the following format:
+ * {@code
+ * --zendeskBaseUrl=your-url \
+ * --adminEmail=your-email \
+ * --apiToken=your-token \
+ * --objectsToPull=Groups \
+ * --referenceName=your-reference-name \
+ * --outputTxtFilePathPrefix=your-path-to-output-folder-with-filename-prefix
+ * }
+ *
+ * By default this will run the pipeline locally with the DirectRunner. To 
change the runner, specify:
+ * {@code
+ * --runner=YOUR_SELECTED_RUNNER
+ * }
+ * </pre>
+ */
+public class CdapZendeskToTxt {
+
+  /* Logger for class.*/
+  private static final Logger LOG = 
LoggerFactory.getLogger(CdapZendeskToTxt.class);
+
+  /**
+   * Main entry point for pipeline execution.
+   *
+   * @param args Command line arguments to the pipeline.
+   */
+  public static void main(String[] args) {
+    CdapZendeskOptions options =
+        
PipelineOptionsFactory.fromArgs(args).withValidation().as(CdapZendeskOptions.class);
+
+    // Create the pipeline
+    Pipeline pipeline = Pipeline.create(options);
+    run(pipeline, options);
+  }
+
+  /**
+   * Runs a pipeline which reads message from CDAP and writes it to .txt file.
+   *
+   * @param options arguments to the pipeline
+   */
+  public static PipelineResult run(Pipeline pipeline, CdapZendeskOptions 
options) {
+    Map<String, Object> pluginConfigParams =
+        PluginConfigOptionsConverter.zendeskOptionsToParamsMap(options);
+    LOG.info("Starting Cdap-Zendesk-To-Txt pipeline with parameters: {}", 
pluginConfigParams);
+
+    /*
+     * Steps:
+     *  1) Read messages in from Cdap Zendesk
+     *  2) Extract values only
+     *  3) Write successful records to .txt file
+     */
+
+    pipeline
+        .apply("readFromCdapZendesk", readFromCdapZendesk(pluginConfigParams))
+        .setCoder(
+            KvCoder.of(
+                NullableCoder.of(WritableCoder.of(NullWritable.class)),
+                SerializableCoder.of(StructuredRecord.class)))
+        .apply(
+            MapValues.into(TypeDescriptors.strings())
+                .via(StructuredRecordUtils::structuredRecordToString))
+        .setCoder(
+            KvCoder.of(
+                NullableCoder.of(WritableCoder.of(NullWritable.class)), 
StringUtf8Coder.of()))
+        .apply(Values.create())
+        .apply("writeToTxt", 
TextIO.write().to(options.getOutputTxtFilePathPrefix()));
+
+    return pipeline.run();
+  }
+}
diff --git 
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/README.md
 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/README.md
new file mode 100644
index 00000000000..e9b90f206bf
--- /dev/null
+++ 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/README.md
@@ -0,0 +1,55 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+## Gradle preparation
+
+To run this example your `build.gradle` file should contain the following task 
to execute the pipeline:
+
+```
+task executeCdap (type:JavaExec) {
+    mainClass = System.getProperty("mainClass")
+    classpath = sourceSets.main.runtimeClasspath
+    systemProperties System.getProperties()
+    args System.getProperty("exec.args", "").split()
+}
+```
+
+## Running the CdapZendeskToTxt pipeline example
+
+Gradle 'executeCdap' task allows to run the pipeline via the following command:
+
+```bash
+gradle clean executeCdap 
-DmainClass=org.apache.beam.examples.complete.cdap.zendesk.CdapZendeskToTxt \
+     -Dexec.args="--<argument>=<value> --<argument>=<value>"
+```
+
+To execute this pipeline, specify the parameters in the following format:
+
+```bash
+ --zendeskBaseUrl=zendesk-url-key-followed-by-/%s/%s (example: 
https://support.zendesk.com/%s/%s) \
+ --adminEmail=your-admin-admin-email \
+ --apiToken=your-api-token \
+ --subdomains=your-subdomains (example: api/v2) \
+ --maxRetryCount=your-max-retry-count \
+ --maxRetryWait=your-max-retry-wait \
+ --maxRetryJitterWait=your-max-retry-jitter-wait \
+ --connectTimeout=your-connection-timeout \
+ --readTimeout=your-read-timeout \
+ --objectsToPull=your-objects-to-pull (example: Groups) \
+ --outputTxtFilePathPrefix=your-path-to-output-folder-with-filename-prefix
+```
+Please see CDAP [Zendesk Batch 
Source](https://github.com/data-integrations/zendesk/blob/develop/docs/Zendesk-batchsource.md)
 for more information.
diff --git 
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/options/CdapZendeskOptions.java
 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/options/CdapZendeskOptions.java
new file mode 100644
index 00000000000..3b2ef3152d7
--- /dev/null
+++ 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/options/CdapZendeskOptions.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.cdap.zendesk.options;
+
+import org.apache.beam.examples.complete.cdap.options.BaseCdapOptions;
+import org.apache.beam.examples.complete.cdap.zendesk.CdapZendeskToTxt;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Validation;
+
+/**
+ * The {@link CdapZendeskOptions} interface provides the custom execution 
options passed by the
+ * executor at the command-line for {@link CdapZendeskToTxt} example.
+ */
+public interface CdapZendeskOptions extends BaseCdapOptions {
+
+  @Validation.Required
+  @Description("Zendesk base url.")
+  String getZendeskBaseUrl();
+
+  void setZendeskBaseUrl(String zendeskBaseUrl);
+
+  @Validation.Required
+  @Description("Zendesk admin email.")
+  String getAdminEmail();
+
+  void setAdminEmail(String adminEmail);
+
+  @Validation.Required
+  @Description("Zendesk api token.")
+  String getApiToken();
+
+  void setApiToken(String apiToken);
+
+  @Default.String("api/v2")
+  @Description("Zendesk subdomains.")
+  String getSubdomains();
+
+  void setSubdomains(String subdomains);
+
+  @Default.Integer(10000)
+  @Description("Zendesk maxRetryCount.")
+  Integer getMaxRetryCount();
+
+  void setMaxRetryCount(Integer maxRetryCount);
+
+  @Default.Integer(10000)
+  @Description("Zendesk maxRetryWait.")
+  Integer getMaxRetryWait();
+
+  void setMaxRetryWait(Integer maxRetryWait);
+
+  @Default.Integer(10000)
+  @Description("Zendesk maxRetryJitterWait.")
+  Integer getMaxRetryJitterWait();
+
+  void setMaxRetryJitterWait(Integer maxRetryJitterWait);
+
+  @Default.Integer(10)
+  @Description("Zendesk connectTimeout.")
+  Integer getConnectTimeout();
+
+  void setConnectTimeout(Integer connectTimeout);
+
+  @Default.Integer(10)
+  @Description("Zendesk readTimeout.")
+  Integer getReadTimeout();
+
+  void setReadTimeout(Integer readTimeout);
+
+  @Validation.Required
+  @Description("Zendesk objectsToPull.")
+  String getObjectsToPull();
+
+  void setObjectsToPull(String objectsToPull);
+
+  @Validation.Required
+  @Description(
+      "Path to output folder with filename prefix."
+          + "It will write a set of .txt files with names like {prefix}-###.")
+  String getOutputTxtFilePathPrefix();
+
+  void setOutputTxtFilePathPrefix(String outputTxtFilePathPrefix);
+}
diff --git 
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/options/package-info.java
 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/options/package-info.java
new file mode 100644
index 00000000000..70a8b4e1a11
--- /dev/null
+++ 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/options/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Cdap Plugins template. */
+package org.apache.beam.examples.complete.cdap.zendesk.options;
diff --git 
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/package-info.java
 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/package-info.java
new file mode 100644
index 00000000000..471710b8d21
--- /dev/null
+++ 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Cdap Plugins template. */
+package org.apache.beam.examples.complete.cdap.zendesk;
diff --git 
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/transforms/FormatInputTransform.java
 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/transforms/FormatInputTransform.java
new file mode 100644
index 00000000000..ff2ef03c534
--- /dev/null
+++ 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/transforms/FormatInputTransform.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.cdap.zendesk.transforms;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSource;
+import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSourceConfig;
+import java.util.Map;
+import org.apache.beam.sdk.io.cdap.CdapIO;
+import org.apache.beam.sdk.io.cdap.ConfigWrapper;
+import org.apache.hadoop.io.NullWritable;
+
+/** Different input transformations over the processed data in the pipeline. */
+public class FormatInputTransform {
+
+  /**
+   * Configures Cdap Zendesk Read transform.
+   *
+   * @param pluginConfigParams Cdap Zendesk plugin config parameters
+   * @return configured Read transform
+   */
+  public static CdapIO.Read<NullWritable, StructuredRecord> 
readFromCdapZendesk(
+      Map<String, Object> pluginConfigParams) {
+
+    final PluginConfig pluginConfig =
+        new 
ConfigWrapper<>(ZendeskBatchSourceConfig.class).withParams(pluginConfigParams).build();
+
+    checkStateNotNull(pluginConfig, "Plugin config can't be null.");
+
+    return CdapIO.<NullWritable, StructuredRecord>read()
+        .withCdapPluginClass(ZendeskBatchSource.class)
+        .withPluginConfig(pluginConfig)
+        .withKeyClass(NullWritable.class)
+        .withValueClass(StructuredRecord.class);
+  }
+}
diff --git 
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/transforms/package-info.java
 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/transforms/package-info.java
new file mode 100644
index 00000000000..bc55ee8f240
--- /dev/null
+++ 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/transforms/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Cdap Plugins template. */
+package org.apache.beam.examples.complete.cdap.zendesk.transforms;
diff --git 
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/utils/PluginConfigOptionsConverter.java
 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/utils/PluginConfigOptionsConverter.java
new file mode 100644
index 00000000000..a7b18cacf05
--- /dev/null
+++ 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/utils/PluginConfigOptionsConverter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.cdap.zendesk.utils;
+
+import io.cdap.plugin.common.Constants;
+import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSourceConfig;
+import io.cdap.plugin.zendesk.source.common.config.BaseZendeskSourceConfig;
+import java.util.Map;
+import 
org.apache.beam.examples.complete.cdap.zendesk.options.CdapZendeskOptions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Class for converting CDAP {@link 
org.apache.beam.sdk.options.PipelineOptions} to map for {@link
+ * org.apache.beam.sdk.io.cdap.ConfigWrapper}.
+ */
+public class PluginConfigOptionsConverter {
+
+  /** Returns map of parameters for Cdap Zendesk plugin. */
+  public static Map<String, Object> 
zendeskOptionsToParamsMap(CdapZendeskOptions zendeskOptions) {
+    return ImmutableMap.<String, Object>builder()
+        .put(Constants.Reference.REFERENCE_NAME, 
zendeskOptions.getReferenceName())
+        .put(BaseZendeskSourceConfig.PROPERTY_ADMIN_EMAIL, 
zendeskOptions.getAdminEmail())
+        .put(BaseZendeskSourceConfig.PROPERTY_API_TOKEN, 
zendeskOptions.getApiToken())
+        .put(ZendeskBatchSourceConfig.PROPERTY_URL, 
zendeskOptions.getZendeskBaseUrl())
+        .put(ZendeskBatchSourceConfig.PROPERTY_SUBDOMAINS, 
zendeskOptions.getSubdomains())
+        .put(ZendeskBatchSourceConfig.PROPERTY_MAX_RETRY_COUNT, 
zendeskOptions.getMaxRetryCount())
+        .put(ZendeskBatchSourceConfig.PROPERTY_MAX_RETRY_WAIT, 
zendeskOptions.getMaxRetryWait())
+        .put(
+            ZendeskBatchSourceConfig.PROPERTY_MAX_RETRY_JITTER_WAIT,
+            zendeskOptions.getMaxRetryJitterWait())
+        .put(ZendeskBatchSourceConfig.PROPERTY_CONNECT_TIMEOUT, 
zendeskOptions.getConnectTimeout())
+        .put(ZendeskBatchSourceConfig.PROPERTY_READ_TIMEOUT, 
zendeskOptions.getReadTimeout())
+        .put(BaseZendeskSourceConfig.PROPERTY_OBJECTS_TO_PULL, 
zendeskOptions.getObjectsToPull())
+        .build();
+  }
+}
diff --git 
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/utils/package-info.java
 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/utils/package-info.java
new file mode 100644
index 00000000000..0124a059d8f
--- /dev/null
+++ 
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/utils/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Cdap Plugins template. */
+package org.apache.beam.examples.complete.cdap.zendesk.utils;
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 943942ff865..29f76e424c5 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -54,6 +54,7 @@ include(":examples:java")
 include(":examples:java:twitter")
 include(":examples:java:cdap")
 include(":examples:java:cdap:servicenow")
+include(":examples:java:cdap:zendesk")
 include(":examples:kotlin")
 include(":examples:multi-language")
 include(":model:fn-execution")

Reply via email to