This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4886bdf08fd [CdapIO] Complete examples for CDAP Zendesk plugins
(#24589)
4886bdf08fd is described below
commit 4886bdf08fd057a0f8a4e99ab30453089654188d
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Fri Dec 16 01:59:58 2022 +0400
[CdapIO] Complete examples for CDAP Zendesk plugins (#24589)
* Add examples for Cdap Zendesk plugins
* Move common classes to Examples Cdap module
* Fix readme
---
examples/java/cdap/README.md | 2 +-
examples/java/cdap/zendesk/build.gradle | 106 ++++++++++++++++
.../complete/cdap/zendesk/CdapZendeskToTxt.java | 140 +++++++++++++++++++++
.../beam/examples/complete/cdap/zendesk/README.md | 55 ++++++++
.../cdap/zendesk/options/CdapZendeskOptions.java | 99 +++++++++++++++
.../cdap/zendesk/options/package-info.java | 20 +++
.../complete/cdap/zendesk/package-info.java | 20 +++
.../zendesk/transforms/FormatInputTransform.java | 54 ++++++++
.../cdap/zendesk/transforms/package-info.java | 20 +++
.../utils/PluginConfigOptionsConverter.java | 51 ++++++++
.../complete/cdap/zendesk/utils/package-info.java | 20 +++
settings.gradle.kts | 1 +
12 files changed, 587 insertions(+), 1 deletion(-)
diff --git a/examples/java/cdap/README.md b/examples/java/cdap/README.md
index 6cf86335cc7..c6ffc858d88 100644
--- a/examples/java/cdap/README.md
+++ b/examples/java/cdap/README.md
@@ -24,4 +24,4 @@ Supported CDAP plugins:
- [ServiceNow](https://github.com/data-integrations/servicenow-plugins). More
info in the ServiceNow example
[README](servicenow/src/main/java/org/apache/beam/examples/complete/cdap/servicenow/README.md).
- [Salesforce](https://github.com/data-integrations/salesforce)
- [Hubspot](https://github.com/data-integrations/hubspot)
-- [Zendesk](https://github.com/data-integrations/zendesk)
+- [Zendesk](https://github.com/data-integrations/zendesk). More info in the
ServiceNow example
[README](zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/README.md).
diff --git a/examples/java/cdap/zendesk/build.gradle
b/examples/java/cdap/zendesk/build.gradle
new file mode 100644
index 00000000000..277d0761fc3
--- /dev/null
+++ b/examples/java/cdap/zendesk/build.gradle
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import groovy.json.JsonOutput
+
+plugins {
+ id 'java'
+ id 'org.apache.beam.module'
+ id 'com.github.johnrengelman.shadow'
+}
+
+applyJavaNature(
+ exportJavadoc: false,
+ automaticModuleName: 'org.apache.beam.examples.complete.cdap.zendesk',
+)
+
+description = "Apache Beam :: Examples :: Java :: CDAP :: Zendesk"
+ext.summary = """Apache Beam SDK provides a simple, Java-based
+interface for processing virtually any size data. This
+artifact includes CDAP Zendesk Apache Beam Java SDK examples."""
+
+/** Define the list of runners which execute a precommit test.
+ * Some runners are run from separate projects, see the preCommit task below
+ * for details.
+ */
+def preCommitRunners = ["directRunner", "flinkRunner"]
+for (String runner : preCommitRunners) {
+ configurations.create(runner + "PreCommit")
+}
+
+dependencies {
+ implementation
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
+ implementation project(path: ":sdks:java:core", configuration: "shadow")
+ implementation project(":examples:java:cdap")
+ implementation project(":sdks:java:io:cdap")
+ implementation project(":sdks:java:io:hadoop-common")
+ implementation library.java.cdap_api
+ implementation library.java.cdap_api_commons
+ implementation library.java.cdap_etl_api
+ permitUnusedDeclared library.java.cdap_etl_api
+ implementation library.java.cdap_hydrator_common
+ implementation library.java.cdap_plugin_zendesk
+ implementation library.java.hadoop_common
+ implementation library.java.slf4j_api
+ implementation library.java.vendored_guava_26_0_jre
+ runtimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+
+ // Add dependencies for the PreCommit configurations
+ // For each runner a project level dependency on the examples project.
+ for (String runner : preCommitRunners) {
+ delegate.add(runner + "PreCommit",
project(":examples:java:cdap:zendesk"))
+ delegate.add(runner + "PreCommit", project(path:
":examples:java:cdap:zendesk", configuration: "testRuntimeMigration"))
+ }
+ directRunnerPreCommit project(path: ":runners:direct-java", configuration:
"shadow")
+ flinkRunnerPreCommit
project(":runners:flink:${project.ext.latestFlinkVersion}")
+}
+
+/*
+ * Create a ${runner}PreCommit task for each runner which runs a set
+ * of integration tests for WordCount and WindowedWordCount.
+ */
+def preCommitRunnerClass = [
+ directRunner: "org.apache.beam.runners.direct.DirectRunner",
+ flinkRunner: "org.apache.beam.runners.flink.TestFlinkRunner"
+]
+
+for (String runner : preCommitRunners) {
+ tasks.create(name: runner + "PreCommit", type: Test) {
+ def preCommitBeamTestPipelineOptions = [
+ "--runner=" + preCommitRunnerClass[runner],
+ ]
+ classpath = configurations."${runner}PreCommit"
+ forkEvery 1
+ maxParallelForks 4
+ systemProperty "beamTestPipelineOptions",
JsonOutput.toJson(preCommitBeamTestPipelineOptions)
+ }
+}
+
+/* Define a common precommit task which depends on all the individual
precommits. */
+task preCommit() {
+ for (String runner : preCommitRunners) {
+ dependsOn runner + "PreCommit"
+ }
+}
+
+task executeCdap (type:JavaExec) {
+ mainClass = System.getProperty("mainClass")
+ classpath = sourceSets.main.runtimeClasspath
+ systemProperties System.getProperties()
+ args System.getProperty("exec.args", "").split()
+}
diff --git
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/CdapZendeskToTxt.java
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/CdapZendeskToTxt.java
new file mode 100644
index 00000000000..ce8f41e3e91
--- /dev/null
+++
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/CdapZendeskToTxt.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.cdap.zendesk;
+
+import static
org.apache.beam.examples.complete.cdap.zendesk.transforms.FormatInputTransform.readFromCdapZendesk;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import java.util.Map;
+import org.apache.beam.examples.complete.cdap.utils.StructuredRecordUtils;
+import
org.apache.beam.examples.complete.cdap.zendesk.options.CdapZendeskOptions;
+import
org.apache.beam.examples.complete.cdap.zendesk.utils.PluginConfigOptionsConverter;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.hadoop.WritableCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.MapValues;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.hadoop.io.NullWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link CdapZendeskToTxt} pipeline is a batch pipeline which ingests
data in JSON format from
+ * CDAP Zendesk, and outputs the resulting records to .txt file. Zendesk
parameters and output txt
+ * file path are specified by the user as template parameters. <br>
+ *
+ * <p><b>Example Usage</b>
+ *
+ * <pre>
+ * # Gradle preparation
+ *
+ * To run this example your {@code build.gradle} file should contain the
following task
+ * to execute the pipeline:
+ * {@code
+ * task executeCdap (type:JavaExec) {
+ * mainClass = System.getProperty("mainClass")
+ * classpath = sourceSets.main.runtimeClasspath
+ * systemProperties System.getProperties()
+ * args System.getProperty("exec.args", "").split()
+ * }
+ * }
+ *
+ * This task allows to run the pipeline via the following command:
+ * {@code
+ * gradle clean executeCdap
-DmainClass=org.apache.beam.examples.complete.cdap.zendesk.CdapZendeskToTxt \
+ * -Dexec.args="--<argument>=<value> --<argument>=<value>"
+ * }
+ *
+ * # Running the pipeline
+ * To execute this pipeline, specify the parameters in the following format:
+ * {@code
+ * --zendeskBaseUrl=your-url \
+ * --adminEmail=your-email \
+ * --apiToken=your-token \
+ * --objectsToPull=Groups \
+ * --referenceName=your-reference-name \
+ * --outputTxtFilePathPrefix=your-path-to-output-folder-with-filename-prefix
+ * }
+ *
+ * By default this will run the pipeline locally with the DirectRunner. To
change the runner, specify:
+ * {@code
+ * --runner=YOUR_SELECTED_RUNNER
+ * }
+ * </pre>
+ */
+public class CdapZendeskToTxt {
+
+ /* Logger for class.*/
+ private static final Logger LOG =
LoggerFactory.getLogger(CdapZendeskToTxt.class);
+
+ /**
+ * Main entry point for pipeline execution.
+ *
+ * @param args Command line arguments to the pipeline.
+ */
+ public static void main(String[] args) {
+ CdapZendeskOptions options =
+
PipelineOptionsFactory.fromArgs(args).withValidation().as(CdapZendeskOptions.class);
+
+ // Create the pipeline
+ Pipeline pipeline = Pipeline.create(options);
+ run(pipeline, options);
+ }
+
+ /**
+ * Runs a pipeline which reads message from CDAP and writes it to .txt file.
+ *
+ * @param options arguments to the pipeline
+ */
+ public static PipelineResult run(Pipeline pipeline, CdapZendeskOptions
options) {
+ Map<String, Object> pluginConfigParams =
+ PluginConfigOptionsConverter.zendeskOptionsToParamsMap(options);
+ LOG.info("Starting Cdap-Zendesk-To-Txt pipeline with parameters: {}",
pluginConfigParams);
+
+ /*
+ * Steps:
+ * 1) Read messages in from Cdap Zendesk
+ * 2) Extract values only
+ * 3) Write successful records to .txt file
+ */
+
+ pipeline
+ .apply("readFromCdapZendesk", readFromCdapZendesk(pluginConfigParams))
+ .setCoder(
+ KvCoder.of(
+ NullableCoder.of(WritableCoder.of(NullWritable.class)),
+ SerializableCoder.of(StructuredRecord.class)))
+ .apply(
+ MapValues.into(TypeDescriptors.strings())
+ .via(StructuredRecordUtils::structuredRecordToString))
+ .setCoder(
+ KvCoder.of(
+ NullableCoder.of(WritableCoder.of(NullWritable.class)),
StringUtf8Coder.of()))
+ .apply(Values.create())
+ .apply("writeToTxt",
TextIO.write().to(options.getOutputTxtFilePathPrefix()));
+
+ return pipeline.run();
+ }
+}
diff --git
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/README.md
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/README.md
new file mode 100644
index 00000000000..e9b90f206bf
--- /dev/null
+++
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/README.md
@@ -0,0 +1,55 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+## Gradle preparation
+
+To run this example your `build.gradle` file should contain the following task
to execute the pipeline:
+
+```
+task executeCdap (type:JavaExec) {
+ mainClass = System.getProperty("mainClass")
+ classpath = sourceSets.main.runtimeClasspath
+ systemProperties System.getProperties()
+ args System.getProperty("exec.args", "").split()
+}
+```
+
+## Running the CdapZendeskToTxt pipeline example
+
+Gradle 'executeCdap' task allows to run the pipeline via the following command:
+
+```bash
+gradle clean executeCdap
-DmainClass=org.apache.beam.examples.complete.cdap.zendesk.CdapZendeskToTxt \
+ -Dexec.args="--<argument>=<value> --<argument>=<value>"
+```
+
+To execute this pipeline, specify the parameters in the following format:
+
+```bash
+ --zendeskBaseUrl=zendesk-url-key-followed-by-/%s/%s (example:
https://support.zendesk.com/%s/%s) \
+ --adminEmail=your-admin-admin-email \
+ --apiToken=your-api-token \
+ --subdomains=your-subdomains (example: api/v2) \
+ --maxRetryCount=your-max-retry-count \
+ --maxRetryWait=your-max-retry-wait \
+ --maxRetryJitterWait=your-max-retry-jitter-wait \
+ --connectTimeout=your-connection-timeout \
+ --readTimeout=your-read-timeout \
+ --objectsToPull=your-objects-to-pull (example: Groups) \
+ --outputTxtFilePathPrefix=your-path-to-output-folder-with-filename-prefix
+```
+Please see CDAP [Zendesk Batch
Source](https://github.com/data-integrations/zendesk/blob/develop/docs/Zendesk-batchsource.md)
for more information.
diff --git
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/options/CdapZendeskOptions.java
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/options/CdapZendeskOptions.java
new file mode 100644
index 00000000000..3b2ef3152d7
--- /dev/null
+++
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/options/CdapZendeskOptions.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.cdap.zendesk.options;
+
+import org.apache.beam.examples.complete.cdap.options.BaseCdapOptions;
+import org.apache.beam.examples.complete.cdap.zendesk.CdapZendeskToTxt;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Validation;
+
+/**
+ * The {@link CdapZendeskOptions} interface provides the custom execution
options passed by the
+ * executor at the command-line for {@link CdapZendeskToTxt} example.
+ */
+public interface CdapZendeskOptions extends BaseCdapOptions {
+
+ @Validation.Required
+ @Description("Zendesk base url.")
+ String getZendeskBaseUrl();
+
+ void setZendeskBaseUrl(String zendeskBaseUrl);
+
+ @Validation.Required
+ @Description("Zendesk admin email.")
+ String getAdminEmail();
+
+ void setAdminEmail(String adminEmail);
+
+ @Validation.Required
+ @Description("Zendesk api token.")
+ String getApiToken();
+
+ void setApiToken(String apiToken);
+
+ @Default.String("api/v2")
+ @Description("Zendesk subdomains.")
+ String getSubdomains();
+
+ void setSubdomains(String subdomains);
+
+ @Default.Integer(10000)
+ @Description("Zendesk maxRetryCount.")
+ Integer getMaxRetryCount();
+
+ void setMaxRetryCount(Integer maxRetryCount);
+
+ @Default.Integer(10000)
+ @Description("Zendesk maxRetryWait.")
+ Integer getMaxRetryWait();
+
+ void setMaxRetryWait(Integer maxRetryWait);
+
+ @Default.Integer(10000)
+ @Description("Zendesk maxRetryJitterWait.")
+ Integer getMaxRetryJitterWait();
+
+ void setMaxRetryJitterWait(Integer maxRetryJitterWait);
+
+ @Default.Integer(10)
+ @Description("Zendesk connectTimeout.")
+ Integer getConnectTimeout();
+
+ void setConnectTimeout(Integer connectTimeout);
+
+ @Default.Integer(10)
+ @Description("Zendesk readTimeout.")
+ Integer getReadTimeout();
+
+ void setReadTimeout(Integer readTimeout);
+
+ @Validation.Required
+ @Description("Zendesk objectsToPull.")
+ String getObjectsToPull();
+
+ void setObjectsToPull(String objectsToPull);
+
+ @Validation.Required
+ @Description(
+ "Path to output folder with filename prefix."
+ + "It will write a set of .txt files with names like {prefix}-###.")
+ String getOutputTxtFilePathPrefix();
+
+ void setOutputTxtFilePathPrefix(String outputTxtFilePathPrefix);
+}
diff --git
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/options/package-info.java
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/options/package-info.java
new file mode 100644
index 00000000000..70a8b4e1a11
--- /dev/null
+++
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/options/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Cdap Plugins template. */
+package org.apache.beam.examples.complete.cdap.zendesk.options;
diff --git
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/package-info.java
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/package-info.java
new file mode 100644
index 00000000000..471710b8d21
--- /dev/null
+++
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Cdap Plugins template. */
+package org.apache.beam.examples.complete.cdap.zendesk;
diff --git
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/transforms/FormatInputTransform.java
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/transforms/FormatInputTransform.java
new file mode 100644
index 00000000000..ff2ef03c534
--- /dev/null
+++
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/transforms/FormatInputTransform.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.cdap.zendesk.transforms;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSource;
+import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSourceConfig;
+import java.util.Map;
+import org.apache.beam.sdk.io.cdap.CdapIO;
+import org.apache.beam.sdk.io.cdap.ConfigWrapper;
+import org.apache.hadoop.io.NullWritable;
+
+/** Different input transformations over the processed data in the pipeline. */
+public class FormatInputTransform {
+
+ /**
+ * Configures Cdap Zendesk Read transform.
+ *
+ * @param pluginConfigParams Cdap Zendesk plugin config parameters
+ * @return configured Read transform
+ */
+ public static CdapIO.Read<NullWritable, StructuredRecord>
readFromCdapZendesk(
+ Map<String, Object> pluginConfigParams) {
+
+ final PluginConfig pluginConfig =
+ new
ConfigWrapper<>(ZendeskBatchSourceConfig.class).withParams(pluginConfigParams).build();
+
+ checkStateNotNull(pluginConfig, "Plugin config can't be null.");
+
+ return CdapIO.<NullWritable, StructuredRecord>read()
+ .withCdapPluginClass(ZendeskBatchSource.class)
+ .withPluginConfig(pluginConfig)
+ .withKeyClass(NullWritable.class)
+ .withValueClass(StructuredRecord.class);
+ }
+}
diff --git
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/transforms/package-info.java
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/transforms/package-info.java
new file mode 100644
index 00000000000..bc55ee8f240
--- /dev/null
+++
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/transforms/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Cdap Plugins template. */
+package org.apache.beam.examples.complete.cdap.zendesk.transforms;
diff --git
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/utils/PluginConfigOptionsConverter.java
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/utils/PluginConfigOptionsConverter.java
new file mode 100644
index 00000000000..a7b18cacf05
--- /dev/null
+++
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/utils/PluginConfigOptionsConverter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.cdap.zendesk.utils;
+
+import io.cdap.plugin.common.Constants;
+import io.cdap.plugin.zendesk.source.batch.ZendeskBatchSourceConfig;
+import io.cdap.plugin.zendesk.source.common.config.BaseZendeskSourceConfig;
+import java.util.Map;
+import
org.apache.beam.examples.complete.cdap.zendesk.options.CdapZendeskOptions;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Class for converting CDAP {@link
org.apache.beam.sdk.options.PipelineOptions} to map for {@link
+ * org.apache.beam.sdk.io.cdap.ConfigWrapper}.
+ */
+public class PluginConfigOptionsConverter {
+
+ /** Returns map of parameters for Cdap Zendesk plugin. */
+ public static Map<String, Object>
zendeskOptionsToParamsMap(CdapZendeskOptions zendeskOptions) {
+ return ImmutableMap.<String, Object>builder()
+ .put(Constants.Reference.REFERENCE_NAME,
zendeskOptions.getReferenceName())
+ .put(BaseZendeskSourceConfig.PROPERTY_ADMIN_EMAIL,
zendeskOptions.getAdminEmail())
+ .put(BaseZendeskSourceConfig.PROPERTY_API_TOKEN,
zendeskOptions.getApiToken())
+ .put(ZendeskBatchSourceConfig.PROPERTY_URL,
zendeskOptions.getZendeskBaseUrl())
+ .put(ZendeskBatchSourceConfig.PROPERTY_SUBDOMAINS,
zendeskOptions.getSubdomains())
+ .put(ZendeskBatchSourceConfig.PROPERTY_MAX_RETRY_COUNT,
zendeskOptions.getMaxRetryCount())
+ .put(ZendeskBatchSourceConfig.PROPERTY_MAX_RETRY_WAIT,
zendeskOptions.getMaxRetryWait())
+ .put(
+ ZendeskBatchSourceConfig.PROPERTY_MAX_RETRY_JITTER_WAIT,
+ zendeskOptions.getMaxRetryJitterWait())
+ .put(ZendeskBatchSourceConfig.PROPERTY_CONNECT_TIMEOUT,
zendeskOptions.getConnectTimeout())
+ .put(ZendeskBatchSourceConfig.PROPERTY_READ_TIMEOUT,
zendeskOptions.getReadTimeout())
+ .put(BaseZendeskSourceConfig.PROPERTY_OBJECTS_TO_PULL,
zendeskOptions.getObjectsToPull())
+ .build();
+ }
+}
diff --git
a/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/utils/package-info.java
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/utils/package-info.java
new file mode 100644
index 00000000000..0124a059d8f
--- /dev/null
+++
b/examples/java/cdap/zendesk/src/main/java/org/apache/beam/examples/complete/cdap/zendesk/utils/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Cdap Plugins template. */
+package org.apache.beam.examples.complete.cdap.zendesk.utils;
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 943942ff865..29f76e424c5 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -54,6 +54,7 @@ include(":examples:java")
include(":examples:java:twitter")
include(":examples:java:cdap")
include(":examples:java:cdap:servicenow")
+include(":examples:java:cdap:zendesk")
include(":examples:kotlin")
include(":examples:multi-language")
include(":model:fn-execution")