This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f30e4e57ecf [BEAM-14081] [CdapIO] Add context classes for CDAP plugins
(#17104)
f30e4e57ecf is described below
commit f30e4e57ecfbfe49d5a61bb11a045da52d5fc1ad
Author: Igor Krasavin <[email protected]>
AuthorDate: Wed May 11 19:23:23 2022 +0300
[BEAM-14081] [CdapIO] Add context classes for CDAP plugins (#17104)
* [BEAM-14048] Add ConfigWrapper for building CDAP PluginConfigs
* [BEAM-14048] Fix checkstyle
* [BEAM-14048] Fix warnings
* [BEAM-14048] Fix warnings
* [BEAM-14048] Fix warning
* [BEAM-14048] Fix warning
* [BEAM-14048] Remove unused dependencies
* [BEAM-14048] Add needed dependencies
* [BEAM-14048] Fix spotless
* [BEAM-14048] Fix typo
* [BEAM-14048] Use fori instead of stream
* [BEAM-14048] Suppress warning
* [BEAM-14048] Add used undeclared artifacts
* [BEAM-14048] Change dependencies to test
* Add context.
* Fix dependencies issue
* Add null annotation
* [BEAM-14048] Refactoring
* Add SuppressWarning.
* Fix style.
* Determine dependencies.
* [BEAM-14048] Use CDAP InstantiatorFactory for creating config objects
* [BEAM-14048] Suppress warning
* [BEAM-14081] Refactoring
* Update maven repo
* Update build.gradle
* [BEAM-14081] Refactoring
* [BEAM-14048] Use ServiceNow CDAP dependency from Maven central
* [BEAM-14048] Set macroFields
* [BEAM-14081] Fix javadoc
* [BEAM-14081] Make BatchContextImpl class abstract
Co-authored-by: vitaly.terentyev <[email protected]>
Co-authored-by: Alex Kosolapov <[email protected]>
Co-authored-by: Elizaveta Lomteva <[email protected]>
Co-authored-by: Elizaveta Lomteva
<[email protected]>
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 3 +
sdks/java/io/cdap/build.gradle | 11 +-
.../beam/sdk/io/cdap/context/BatchContextImpl.java | 232 +++++++++++++++++++++
.../sdk/io/cdap/context/BatchSinkContextImpl.java | 33 +++
.../io/cdap/context/BatchSourceContextImpl.java | 40 ++++
.../io/cdap/context/FailureCollectorWrapper.java | 55 +++++
.../cdap/context/StreamingSourceContextImpl.java | 37 ++++
.../beam/sdk/io/cdap/context/package-info.java | 24 +++
.../sdk/io/cdap/context/BatchContextImplTest.java | 59 ++++++
.../cdap/context/FailureCollectorWrapperTest.java | 93 +++++++++
10 files changed, 585 insertions(+), 2 deletions(-)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 4bb05439ab1..74af92e42b8 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -538,8 +538,10 @@ class BeamModulePlugin implements Plugin<Project> {
cassandra_driver_core :
"com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version",
cassandra_driver_mapping :
"com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version",
cdap_api :
"io.cdap.cdap:cdap-api:$cdap_version",
+ cdap_api_commons :
"io.cdap.cdap:cdap-api-common:$cdap_version",
cdap_common :
"io.cdap.cdap:cdap-common:$cdap_version",
cdap_etl_api :
"io.cdap.cdap:cdap-etl-api:$cdap_version",
+ cdap_etl_api_spark :
"io.cdap.cdap:cdap-etl-api-spark:$cdap_version",
cdap_plugin_service_now :
"io.cdap.plugin:servicenow-plugins:1.1.0",
checker_qual :
"org.checkerframework:checker-qual:$checkerframework_version",
classgraph :
"io.github.classgraph:classgraph:$classgraph_version",
@@ -693,6 +695,7 @@ class BeamModulePlugin implements Plugin<Project> {
spark3_sql :
"org.apache.spark:spark-sql_2.12:$spark3_version",
spark3_streaming :
"org.apache.spark:spark-streaming_2.12:$spark3_version",
stax2_api :
"org.codehaus.woodstox:stax2-api:4.2.1",
+ tephra :
"org.apache.tephra:tephra-api:0.15.0-incubating",
testcontainers_base :
"org.testcontainers:testcontainers:$testcontainers_version",
testcontainers_clickhouse :
"org.testcontainers:clickhouse:$testcontainers_version",
testcontainers_elasticsearch :
"org.testcontainers:elasticsearch:$testcontainers_version",
diff --git a/sdks/java/io/cdap/build.gradle b/sdks/java/io/cdap/build.gradle
index 4ef361924d7..9fe7d305a29 100644
--- a/sdks/java/io/cdap/build.gradle
+++ b/sdks/java/io/cdap/build.gradle
@@ -38,15 +38,22 @@ interface for integration with CDAP plugins."""
*/
dependencies {
- implementation library.java.guava
implementation library.java.cdap_api
- implementation library.java.cdap_common
+ implementation library.java.cdap_api_commons
+ implementation (library.java.cdap_common) {
+ exclude module: "log4j-over-slf4j"
+ }
+ implementation library.java.cdap_etl_api
+ implementation library.java.cdap_etl_api_spark
implementation library.java.jackson_core
implementation library.java.jackson_databind
+ implementation library.java.guava
implementation library.java.slf4j_api
+ implementation library.java.tephra
implementation project(path: ":sdks:java:core", configuration: "shadow")
testImplementation library.java.cdap_plugin_service_now
testImplementation library.java.cdap_etl_api
testImplementation library.java.vendored_guava_26_0_jre
testImplementation library.java.junit
+ testImplementation project(path: ":runners:direct-java", configuration:
"shadow")
}
diff --git
a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchContextImpl.java
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchContextImpl.java
new file mode 100644
index 00000000000..06b174062df
--- /dev/null
+++
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchContextImpl.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import io.cdap.cdap.api.data.DatasetInstantiationException;
+import io.cdap.cdap.api.data.batch.InputFormatProvider;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.dataset.Dataset;
+import io.cdap.cdap.api.dataset.DatasetManagementException;
+import io.cdap.cdap.api.dataset.DatasetProperties;
+import io.cdap.cdap.api.metadata.Metadata;
+import io.cdap.cdap.api.metadata.MetadataEntity;
+import io.cdap.cdap.api.metadata.MetadataException;
+import io.cdap.cdap.api.metadata.MetadataScope;
+import io.cdap.cdap.api.plugin.PluginProperties;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.Lookup;
+import io.cdap.cdap.etl.api.StageMetrics;
+import io.cdap.cdap.etl.api.SubmitterLifecycle;
+import io.cdap.cdap.etl.api.action.SettableArguments;
+import io.cdap.cdap.etl.api.batch.BatchContext;
+import io.cdap.cdap.etl.api.lineage.field.FieldOperation;
+import java.net.URL;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/** Class for Batch, Sink and Stream CDAP wrapper classes that use it to
provide common details. */
+@SuppressWarnings({"TypeParameterUnusedInFormals", "nullness"})
+public abstract class BatchContextImpl implements BatchContext {
+
+ private final FailureCollectorWrapper failureCollector = new
FailureCollectorWrapper();
+
+ /**
+ * This should be set after {@link SubmitterLifecycle#prepareRun(Object)}
call with passing this
+ * context object as a param.
+ */
+ protected InputFormatProvider inputFormatProvider;
+
+ private final Timestamp startTime = new
Timestamp(System.currentTimeMillis());
+
+ public InputFormatProvider getInputFormatProvider() {
+ return inputFormatProvider;
+ }
+
+ @Override
+ public String getStageName() {
+ return null;
+ }
+
+ @Override
+ public String getNamespace() {
+ return null;
+ }
+
+ @Override
+ public String getPipelineName() {
+ return null;
+ }
+
+ @Override
+ public long getLogicalStartTime() {
+ return this.startTime.getTime();
+ }
+
+ @Override
+ public StageMetrics getMetrics() {
+ return null;
+ }
+
+ @Override
+ public PluginProperties getPluginProperties() {
+ return null;
+ }
+
+ @Override
+ public PluginProperties getPluginProperties(String pluginId) {
+ return null;
+ }
+
+ @Override
+ public <T> Class<T> loadPluginClass(String pluginId) {
+ return null;
+ }
+
+ @Override
+ public <T> T newPluginInstance(String pluginId) throws
InstantiationException {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Schema getInputSchema() {
+ return null;
+ }
+
+ @Override
+ public @Nullable Map<String, Schema> getInputSchemas() {
+ return null;
+ }
+
+ @Override
+ public @Nullable Schema getOutputSchema() {
+ return null;
+ }
+
+ @Override
+ public Map<String, Schema> getOutputPortSchemas() {
+ return null;
+ }
+
+ @Override
+ public void createDataset(String datasetName, String typeName,
DatasetProperties properties)
+ throws DatasetManagementException {}
+
+ @Override
+ public boolean datasetExists(String datasetName) throws
DatasetManagementException {
+ return false;
+ }
+
+ @Override
+ public SettableArguments getArguments() {
+ return null;
+ }
+
+ @Override
+ public FailureCollector getFailureCollector() {
+ return this.failureCollector;
+ }
+
+ @Nullable
+ @Override
+ public URL getServiceURL(String applicationId, String serviceId) {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public URL getServiceURL(String serviceId) {
+ return null;
+ }
+
+ @Override
+ public Map<MetadataScope, Metadata> getMetadata(MetadataEntity
metadataEntity)
+ throws MetadataException {
+ return null;
+ }
+
+ @Override
+ public Metadata getMetadata(MetadataScope scope, MetadataEntity
metadataEntity)
+ throws MetadataException {
+ return null;
+ }
+
+ @Override
+ public void addProperties(MetadataEntity metadataEntity, Map<String, String>
properties) {}
+
+ @Override
+ public void addTags(MetadataEntity metadataEntity, String... tags) {}
+
+ @Override
+ public void addTags(MetadataEntity metadataEntity, Iterable<String> tags) {}
+
+ @Override
+ public void removeMetadata(MetadataEntity metadataEntity) {}
+
+ @Override
+ public void removeProperties(MetadataEntity metadataEntity) {}
+
+ @Override
+ public void removeProperties(MetadataEntity metadataEntity, String... keys)
{}
+
+ @Override
+ public void removeTags(MetadataEntity metadataEntity) {}
+
+ @Override
+ public void removeTags(MetadataEntity metadataEntity, String... tags) {}
+
+ @Override
+ public void record(List<FieldOperation> fieldOperations) {}
+
+ @Override
+ public <T extends Dataset> T getDataset(String name) throws
DatasetInstantiationException {
+ return null;
+ }
+
+ @Override
+ public <T extends Dataset> T getDataset(String namespace, String name)
+ throws DatasetInstantiationException {
+ return null;
+ }
+
+ @Override
+ public <T extends Dataset> T getDataset(String name, Map<String, String>
arguments)
+ throws DatasetInstantiationException {
+ return null;
+ }
+
+ @Override
+ public <T extends Dataset> T getDataset(
+ String namespace, String name, Map<String, String> arguments)
+ throws DatasetInstantiationException {
+ return null;
+ }
+
+ @Override
+ public void releaseDataset(Dataset dataset) {}
+
+ @Override
+ public void discardDataset(Dataset dataset) {}
+
+ @Override
+ public <T> Lookup<T> provide(String table, Map<String, String> arguments) {
+ return null;
+ }
+}
diff --git
a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java
new file mode 100644
index 00000000000..f0374f7793d
--- /dev/null
+++
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import io.cdap.cdap.api.data.batch.Output;
+import io.cdap.cdap.etl.api.batch.BatchSinkContext;
+
+/** Class for creating context object of different CDAP classes with batch
sink type. */
+public class BatchSinkContextImpl extends BatchContextImpl implements
BatchSinkContext {
+
+ @Override
+ public void addOutput(Output output) {}
+
+ @Override
+ public boolean isPreviewEnabled() {
+ return false;
+ }
+}
diff --git
a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSourceContextImpl.java
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSourceContextImpl.java
new file mode 100644
index 00000000000..98532936035
--- /dev/null
+++
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSourceContextImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import io.cdap.cdap.api.data.batch.Input;
+import io.cdap.cdap.etl.api.batch.BatchSourceContext;
+
+/** Class for creating context object of different CDAP classes with batch
source type. */
+public class BatchSourceContextImpl extends BatchContextImpl implements
BatchSourceContext {
+
+ @Override
+ public void setInput(Input input) {
+ this.inputFormatProvider = ((Input.InputFormatProviderInput)
input).getInputFormatProvider();
+ }
+
+ @Override
+ public boolean isPreviewEnabled() {
+ return false;
+ }
+
+ @Override
+ public int getMaxPreviewRecords() {
+ return 0;
+ }
+}
diff --git
a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapper.java
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapper.java
new file mode 100644
index 00000000000..d697909d02e
--- /dev/null
+++
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.validation.ValidationException;
+import io.cdap.cdap.etl.api.validation.ValidationFailure;
+import java.util.ArrayList;
+import javax.annotation.Nullable;
+
+/** Class FailureCollectorWrapper is a class for collecting ValidationFailure.
*/
+public class FailureCollectorWrapper implements FailureCollector {
+ private ArrayList<ValidationFailure> failuresCollection;
+
+ public FailureCollectorWrapper() {
+ this.failuresCollection = new ArrayList<>();
+ }
+
+ @Override
+ public ValidationFailure addFailure(String message, @Nullable String
correctiveAction) {
+ ValidationFailure validationFailure = new ValidationFailure(message,
correctiveAction);
+ failuresCollection.add(validationFailure);
+
+ return validationFailure;
+ }
+
+ @Override
+ public ValidationException getOrThrowException() throws ValidationException {
+ if (failuresCollection.isEmpty()) {
+ return new ValidationException(this.failuresCollection);
+ }
+
+ throw new ValidationException(this.failuresCollection);
+ }
+
+ @Override
+ public ArrayList<ValidationFailure> getValidationFailures() {
+ return this.failuresCollection;
+ }
+}
diff --git
a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/StreamingSourceContextImpl.java
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/StreamingSourceContextImpl.java
new file mode 100644
index 00000000000..7c09ba19f5f
--- /dev/null
+++
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/StreamingSourceContextImpl.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.dataset.DatasetManagementException;
+import io.cdap.cdap.etl.api.streaming.StreamingSourceContext;
+import javax.annotation.Nullable;
+import org.apache.tephra.TransactionFailureException;
+
+/** Class for creating context object of different CDAP classes with stream
source type. */
+public class StreamingSourceContextImpl extends BatchContextImpl implements
StreamingSourceContext {
+
+ @Override
+ public void registerLineage(String referenceName, @Nullable Schema schema)
+ throws DatasetManagementException, TransactionFailureException {}
+
+ @Override
+ public boolean isPreviewEnabled() {
+ return false;
+ }
+}
diff --git
a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/package-info.java
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/package-info.java
new file mode 100644
index 00000000000..f6548ccdf93
--- /dev/null
+++
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Context for CDAP classes. */
+@Experimental(Kind.SOURCE_SINK)
+package org.apache.beam.sdk.io.cdap.context;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
diff --git
a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/BatchContextImplTest.java
b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/BatchContextImplTest.java
new file mode 100644
index 00000000000..8f679fe3fc0
--- /dev/null
+++
b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/BatchContextImplTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.validation.ValidationException;
+import java.sql.Timestamp;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test class for {@link BatchContextImpl}. */
+@RunWith(JUnit4.class)
+public class BatchContextImplTest {
+
+ @Test
+ public void getLogicalStartTime() {
+ /** arrange */
+ Timestamp expectedStartTime = new Timestamp(System.currentTimeMillis());
+ BatchContextImpl context = new BatchSourceContextImpl();
+
+ /** act */
+ long actualStartTime = context.getLogicalStartTime();
+
+ /** assert */
+ assertTrue((expectedStartTime.getTime() - actualStartTime) <= 100);
+ }
+
+ @Test
+ public void getFailureCollector() {
+ /** arrange */
+ BatchContextImpl context = new BatchSinkContextImpl();
+
+ /** act */
+ FailureCollector failureCollector = context.getFailureCollector();
+
+ /** assert */
+ ValidationException validationException =
failureCollector.getOrThrowException();
+ assertEquals(0, validationException.getFailures().size());
+ }
+}
diff --git
a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapperTest.java
b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapperTest.java
new file mode 100644
index 00000000000..0e35c8a06a5
--- /dev/null
+++
b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapperTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import io.cdap.cdap.etl.api.validation.ValidationException;
+import io.cdap.cdap.etl.api.validation.ValidationFailure;
+import java.util.ArrayList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test class for {@link FailureCollectorWrapper}. */
+@RunWith(JUnit4.class)
+public class FailureCollectorWrapperTest {
+
+ @Test
+ public void addFailure() {
+ /** arrange */
+ FailureCollectorWrapper failureCollectorWrapper = new
FailureCollectorWrapper();
+
+ /** act */
+ RuntimeException error = new RuntimeException("An error has occurred");
+ failureCollectorWrapper.addFailure(error.getMessage(), null);
+
+ /** assert */
+ assertThrows(ValidationException.class, () ->
failureCollectorWrapper.getOrThrowException());
+ }
+
+ @Test
+ public void getOrThrowException() {
+ /** arrange */
+ FailureCollectorWrapper failureCollectorWrapper = new
FailureCollectorWrapper();
+ String errorMessage = "An error has occurred";
+ String expectedMessage = "Errors were encountered during validation. An
error has occurred";
+
+ FailureCollectorWrapper emptyFailureCollectorWrapper = new
FailureCollectorWrapper();
+
+ RuntimeException error = new RuntimeException(errorMessage);
+ failureCollectorWrapper.addFailure(error.getMessage(), null);
+
+ /** act && assert */
+ ValidationException e =
+ assertThrows(
+ ValidationException.class, () ->
failureCollectorWrapper.getOrThrowException());
+ assertEquals(expectedMessage, e.getMessage());
+
+ // A case when return ValidationException with empty collector
+ ArrayList<ValidationFailure> exceptionCollector =
+ emptyFailureCollectorWrapper.getValidationFailures();
+ assertEquals(0, exceptionCollector.size());
+ }
+
+ @Test
+ public void getValidationFailures() {
+ /** arrange */
+ FailureCollectorWrapper failureCollectorWrapper = new
FailureCollectorWrapper();
+ String errorMessage = "An error has occurred";
+
+ FailureCollectorWrapper emptyFailureCollectorWrapper = new
FailureCollectorWrapper();
+
+ RuntimeException error = new RuntimeException(errorMessage);
+ failureCollectorWrapper.addFailure(error.getMessage(), null);
+
+ /** act */
+ ArrayList<ValidationFailure> exceptionCollector =
+ failureCollectorWrapper.getValidationFailures();
+ ArrayList<ValidationFailure> emptyExceptionCollector =
+ emptyFailureCollectorWrapper.getValidationFailures();
+
+ /** assert */
+ assertEquals(1, exceptionCollector.size());
+ assertEquals(errorMessage, exceptionCollector.get(0).getMessage());
+ assertEquals(0, emptyExceptionCollector.size());
+ }
+}