This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f30e4e57ecf [BEAM-14081] [CdapIO] Add context classes for CDAP plugins 
(#17104)
f30e4e57ecf is described below

commit f30e4e57ecfbfe49d5a61bb11a045da52d5fc1ad
Author: Igor Krasavin <[email protected]>
AuthorDate: Wed May 11 19:23:23 2022 +0300

    [BEAM-14081] [CdapIO] Add context classes for CDAP plugins (#17104)
    
    * [BEAM-14048] Add ConfigWrapper for building CDAP PluginConfigs
    
    * [BEAM-14048] Fix checkstyle
    
    * [BEAM-14048] Fix warnings
    
    * [BEAM-14048] Fix warnings
    
    * [BEAM-14048] Fix warning
    
    * [BEAM-14048] Fix warning
    
    * [BEAM-14048] Remove unused dependencies
    
    * [BEAM-14048] Add needed dependencies
    
    * [BEAM-14048] Fix spotless
    
    * [BEAM-14048] Fix typo
    
    * [BEAM-14048] Use fori instead of stream
    
    * [BEAM-14048] Suppress warning
    
    * [BEAM-14048] Add used undeclared artifacts
    
    * [BEAM-14048] Change dependencies to test
    
    * Add context.
    
    * Fix dependencies issue
    
    * Add null annotation
    
    * [BEAM-14048] Refactoring
    
    * Add SuppressWarning.
    
    * Fix style.
    
    * Determine dependencies.
    
    * [BEAM-14048] Use CDAP InstantiatorFactory for creating config objects
    
    * [BEAM-14048] Suppress warning
    
    * [BEAM-14081] Refactoring
    
    * Update maven repo
    
    * Update build.gradle
    
    * [BEAM-14081] Refactoring
    
    * [BEAM-14048] Use ServiceNow CDAP dependency from Maven central
    
    * [BEAM-14048] Set macroFields
    
    * [BEAM-14081] Fix javadoc
    
    * [BEAM-14081] Make BatchContextImpl class abstract
    
    Co-authored-by: vitaly.terentyev <[email protected]>
    Co-authored-by: Alex Kosolapov <[email protected]>
    Co-authored-by: Elizaveta Lomteva <[email protected]>
    Co-authored-by: Elizaveta Lomteva 
<[email protected]>
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   3 +
 sdks/java/io/cdap/build.gradle                     |  11 +-
 .../beam/sdk/io/cdap/context/BatchContextImpl.java | 232 +++++++++++++++++++++
 .../sdk/io/cdap/context/BatchSinkContextImpl.java  |  33 +++
 .../io/cdap/context/BatchSourceContextImpl.java    |  40 ++++
 .../io/cdap/context/FailureCollectorWrapper.java   |  55 +++++
 .../cdap/context/StreamingSourceContextImpl.java   |  37 ++++
 .../beam/sdk/io/cdap/context/package-info.java     |  24 +++
 .../sdk/io/cdap/context/BatchContextImplTest.java  |  59 ++++++
 .../cdap/context/FailureCollectorWrapperTest.java  |  93 +++++++++
 10 files changed, 585 insertions(+), 2 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 4bb05439ab1..74af92e42b8 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -538,8 +538,10 @@ class BeamModulePlugin implements Plugin<Project> {
         cassandra_driver_core                       : 
"com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version",
         cassandra_driver_mapping                    : 
"com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version",
         cdap_api                                    : 
"io.cdap.cdap:cdap-api:$cdap_version",
+        cdap_api_commons                            : 
"io.cdap.cdap:cdap-api-common:$cdap_version",
         cdap_common                                 : 
"io.cdap.cdap:cdap-common:$cdap_version",
         cdap_etl_api                                : 
"io.cdap.cdap:cdap-etl-api:$cdap_version",
+        cdap_etl_api_spark                          : 
"io.cdap.cdap:cdap-etl-api-spark:$cdap_version",
         cdap_plugin_service_now                     : 
"io.cdap.plugin:servicenow-plugins:1.1.0",
         checker_qual                                : 
"org.checkerframework:checker-qual:$checkerframework_version",
         classgraph                                  : 
"io.github.classgraph:classgraph:$classgraph_version",
@@ -693,6 +695,7 @@ class BeamModulePlugin implements Plugin<Project> {
         spark3_sql                                  : 
"org.apache.spark:spark-sql_2.12:$spark3_version",
         spark3_streaming                            : 
"org.apache.spark:spark-streaming_2.12:$spark3_version",
         stax2_api                                   : 
"org.codehaus.woodstox:stax2-api:4.2.1",
+        tephra                                      : 
"org.apache.tephra:tephra-api:0.15.0-incubating",
         testcontainers_base                         : 
"org.testcontainers:testcontainers:$testcontainers_version",
         testcontainers_clickhouse                   : 
"org.testcontainers:clickhouse:$testcontainers_version",
         testcontainers_elasticsearch                : 
"org.testcontainers:elasticsearch:$testcontainers_version",
diff --git a/sdks/java/io/cdap/build.gradle b/sdks/java/io/cdap/build.gradle
index 4ef361924d7..9fe7d305a29 100644
--- a/sdks/java/io/cdap/build.gradle
+++ b/sdks/java/io/cdap/build.gradle
@@ -38,15 +38,22 @@ interface for integration with CDAP plugins."""
  */
 
 dependencies {
-    implementation library.java.guava
     implementation library.java.cdap_api
-    implementation library.java.cdap_common
+    implementation library.java.cdap_api_commons
+    implementation (library.java.cdap_common) {
+        exclude module: "log4j-over-slf4j"
+    }
+    implementation library.java.cdap_etl_api
+    implementation library.java.cdap_etl_api_spark
     implementation library.java.jackson_core
     implementation library.java.jackson_databind
+    implementation library.java.guava
     implementation library.java.slf4j_api
+    implementation library.java.tephra
     implementation project(path: ":sdks:java:core", configuration: "shadow")
     testImplementation library.java.cdap_plugin_service_now
     testImplementation library.java.cdap_etl_api
     testImplementation library.java.vendored_guava_26_0_jre
     testImplementation library.java.junit
+    testImplementation project(path: ":runners:direct-java", configuration: 
"shadow")
 }
diff --git 
a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchContextImpl.java
 
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchContextImpl.java
new file mode 100644
index 00000000000..06b174062df
--- /dev/null
+++ 
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchContextImpl.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import io.cdap.cdap.api.data.DatasetInstantiationException;
+import io.cdap.cdap.api.data.batch.InputFormatProvider;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.dataset.Dataset;
+import io.cdap.cdap.api.dataset.DatasetManagementException;
+import io.cdap.cdap.api.dataset.DatasetProperties;
+import io.cdap.cdap.api.metadata.Metadata;
+import io.cdap.cdap.api.metadata.MetadataEntity;
+import io.cdap.cdap.api.metadata.MetadataException;
+import io.cdap.cdap.api.metadata.MetadataScope;
+import io.cdap.cdap.api.plugin.PluginProperties;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.Lookup;
+import io.cdap.cdap.etl.api.StageMetrics;
+import io.cdap.cdap.etl.api.SubmitterLifecycle;
+import io.cdap.cdap.etl.api.action.SettableArguments;
+import io.cdap.cdap.etl.api.batch.BatchContext;
+import io.cdap.cdap.etl.api.lineage.field.FieldOperation;
+import java.net.URL;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/** Class for Batch, Sink and Stream CDAP wrapper classes that use it to 
provide common details. */
+@SuppressWarnings({"TypeParameterUnusedInFormals", "nullness"})
+public abstract class BatchContextImpl implements BatchContext {
+
+  private final FailureCollectorWrapper failureCollector = new 
FailureCollectorWrapper();
+
+  /**
+   * This should be set after {@link SubmitterLifecycle#prepareRun(Object)} 
call with passing this
+   * context object as a param.
+   */
+  protected InputFormatProvider inputFormatProvider;
+
+  private final Timestamp startTime = new 
Timestamp(System.currentTimeMillis());
+
+  public InputFormatProvider getInputFormatProvider() {
+    return inputFormatProvider;
+  }
+
+  @Override
+  public String getStageName() {
+    return null;
+  }
+
+  @Override
+  public String getNamespace() {
+    return null;
+  }
+
+  @Override
+  public String getPipelineName() {
+    return null;
+  }
+
+  @Override
+  public long getLogicalStartTime() {
+    return this.startTime.getTime();
+  }
+
+  @Override
+  public StageMetrics getMetrics() {
+    return null;
+  }
+
+  @Override
+  public PluginProperties getPluginProperties() {
+    return null;
+  }
+
+  @Override
+  public PluginProperties getPluginProperties(String pluginId) {
+    return null;
+  }
+
+  @Override
+  public <T> Class<T> loadPluginClass(String pluginId) {
+    return null;
+  }
+
+  @Override
+  public <T> T newPluginInstance(String pluginId) throws 
InstantiationException {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public Schema getInputSchema() {
+    return null;
+  }
+
+  @Override
+  public @Nullable Map<String, Schema> getInputSchemas() {
+    return null;
+  }
+
+  @Override
+  public @Nullable Schema getOutputSchema() {
+    return null;
+  }
+
+  @Override
+  public Map<String, Schema> getOutputPortSchemas() {
+    return null;
+  }
+
+  @Override
+  public void createDataset(String datasetName, String typeName, 
DatasetProperties properties)
+      throws DatasetManagementException {}
+
+  @Override
+  public boolean datasetExists(String datasetName) throws 
DatasetManagementException {
+    return false;
+  }
+
+  @Override
+  public SettableArguments getArguments() {
+    return null;
+  }
+
+  @Override
+  public FailureCollector getFailureCollector() {
+    return this.failureCollector;
+  }
+
+  @Nullable
+  @Override
+  public URL getServiceURL(String applicationId, String serviceId) {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public URL getServiceURL(String serviceId) {
+    return null;
+  }
+
+  @Override
+  public Map<MetadataScope, Metadata> getMetadata(MetadataEntity 
metadataEntity)
+      throws MetadataException {
+    return null;
+  }
+
+  @Override
+  public Metadata getMetadata(MetadataScope scope, MetadataEntity 
metadataEntity)
+      throws MetadataException {
+    return null;
+  }
+
+  @Override
+  public void addProperties(MetadataEntity metadataEntity, Map<String, String> 
properties) {}
+
+  @Override
+  public void addTags(MetadataEntity metadataEntity, String... tags) {}
+
+  @Override
+  public void addTags(MetadataEntity metadataEntity, Iterable<String> tags) {}
+
+  @Override
+  public void removeMetadata(MetadataEntity metadataEntity) {}
+
+  @Override
+  public void removeProperties(MetadataEntity metadataEntity) {}
+
+  @Override
+  public void removeProperties(MetadataEntity metadataEntity, String... keys) 
{}
+
+  @Override
+  public void removeTags(MetadataEntity metadataEntity) {}
+
+  @Override
+  public void removeTags(MetadataEntity metadataEntity, String... tags) {}
+
+  @Override
+  public void record(List<FieldOperation> fieldOperations) {}
+
+  @Override
+  public <T extends Dataset> T getDataset(String name) throws 
DatasetInstantiationException {
+    return null;
+  }
+
+  @Override
+  public <T extends Dataset> T getDataset(String namespace, String name)
+      throws DatasetInstantiationException {
+    return null;
+  }
+
+  @Override
+  public <T extends Dataset> T getDataset(String name, Map<String, String> 
arguments)
+      throws DatasetInstantiationException {
+    return null;
+  }
+
+  @Override
+  public <T extends Dataset> T getDataset(
+      String namespace, String name, Map<String, String> arguments)
+      throws DatasetInstantiationException {
+    return null;
+  }
+
+  @Override
+  public void releaseDataset(Dataset dataset) {}
+
+  @Override
+  public void discardDataset(Dataset dataset) {}
+
+  @Override
+  public <T> Lookup<T> provide(String table, Map<String, String> arguments) {
+    return null;
+  }
+}
diff --git 
a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java
 
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java
new file mode 100644
index 00000000000..f0374f7793d
--- /dev/null
+++ 
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSinkContextImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import io.cdap.cdap.api.data.batch.Output;
+import io.cdap.cdap.etl.api.batch.BatchSinkContext;
+
+/** Class for creating context object of different CDAP classes with batch 
sink type. */
+public class BatchSinkContextImpl extends BatchContextImpl implements 
BatchSinkContext {
+
+  @Override
+  public void addOutput(Output output) {}
+
+  @Override
+  public boolean isPreviewEnabled() {
+    return false;
+  }
+}
diff --git 
a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSourceContextImpl.java
 
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSourceContextImpl.java
new file mode 100644
index 00000000000..98532936035
--- /dev/null
+++ 
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/BatchSourceContextImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import io.cdap.cdap.api.data.batch.Input;
+import io.cdap.cdap.etl.api.batch.BatchSourceContext;
+
+/** Class for creating context object of different CDAP classes with batch 
source type. */
+public class BatchSourceContextImpl extends BatchContextImpl implements 
BatchSourceContext {
+
+  @Override
+  public void setInput(Input input) {
+    this.inputFormatProvider = ((Input.InputFormatProviderInput) 
input).getInputFormatProvider();
+  }
+
+  @Override
+  public boolean isPreviewEnabled() {
+    return false;
+  }
+
+  @Override
+  public int getMaxPreviewRecords() {
+    return 0;
+  }
+}
diff --git 
a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapper.java
 
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapper.java
new file mode 100644
index 00000000000..d697909d02e
--- /dev/null
+++ 
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.validation.ValidationException;
+import io.cdap.cdap.etl.api.validation.ValidationFailure;
+import java.util.ArrayList;
+import javax.annotation.Nullable;
+
+/** Class FailureCollectorWrapper is a class for collecting ValidationFailure. 
*/
+public class FailureCollectorWrapper implements FailureCollector {
+  private ArrayList<ValidationFailure> failuresCollection;
+
+  public FailureCollectorWrapper() {
+    this.failuresCollection = new ArrayList<>();
+  }
+
+  @Override
+  public ValidationFailure addFailure(String message, @Nullable String 
correctiveAction) {
+    ValidationFailure validationFailure = new ValidationFailure(message, 
correctiveAction);
+    failuresCollection.add(validationFailure);
+
+    return validationFailure;
+  }
+
+  @Override
+  public ValidationException getOrThrowException() throws ValidationException {
+    if (failuresCollection.isEmpty()) {
+      return new ValidationException(this.failuresCollection);
+    }
+
+    throw new ValidationException(this.failuresCollection);
+  }
+
+  @Override
+  public ArrayList<ValidationFailure> getValidationFailures() {
+    return this.failuresCollection;
+  }
+}
diff --git 
a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/StreamingSourceContextImpl.java
 
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/StreamingSourceContextImpl.java
new file mode 100644
index 00000000000..7c09ba19f5f
--- /dev/null
+++ 
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/StreamingSourceContextImpl.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.dataset.DatasetManagementException;
+import io.cdap.cdap.etl.api.streaming.StreamingSourceContext;
+import javax.annotation.Nullable;
+import org.apache.tephra.TransactionFailureException;
+
+/** Class for creating context object of different CDAP classes with stream 
source type. */
+public class StreamingSourceContextImpl extends BatchContextImpl implements 
StreamingSourceContext {
+
+  @Override
+  public void registerLineage(String referenceName, @Nullable Schema schema)
+      throws DatasetManagementException, TransactionFailureException {}
+
+  @Override
+  public boolean isPreviewEnabled() {
+    return false;
+  }
+}
diff --git 
a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/package-info.java
 
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/package-info.java
new file mode 100644
index 00000000000..f6548ccdf93
--- /dev/null
+++ 
b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/context/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Context for CDAP classes. */
+@Experimental(Kind.SOURCE_SINK)
+package org.apache.beam.sdk.io.cdap.context;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
diff --git 
a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/BatchContextImplTest.java
 
b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/BatchContextImplTest.java
new file mode 100644
index 00000000000..8f679fe3fc0
--- /dev/null
+++ 
b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/BatchContextImplTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.validation.ValidationException;
+import java.sql.Timestamp;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test class for {@link BatchContextImpl}. */
+@RunWith(JUnit4.class)
+public class BatchContextImplTest {
+
+  @Test
+  public void getLogicalStartTime() {
+    /** arrange */
+    Timestamp expectedStartTime = new Timestamp(System.currentTimeMillis());
+    BatchContextImpl context = new BatchSourceContextImpl();
+
+    /** act */
+    long actualStartTime = context.getLogicalStartTime();
+
+    /** assert */
+    assertTrue((expectedStartTime.getTime() - actualStartTime) <= 100);
+  }
+
+  @Test
+  public void getFailureCollector() {
+    /** arrange */
+    BatchContextImpl context = new BatchSinkContextImpl();
+
+    /** act */
+    FailureCollector failureCollector = context.getFailureCollector();
+
+    /** assert */
+    ValidationException validationException = 
failureCollector.getOrThrowException();
+    assertEquals(0, validationException.getFailures().size());
+  }
+}
diff --git 
a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapperTest.java
 
b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapperTest.java
new file mode 100644
index 00000000000..0e35c8a06a5
--- /dev/null
+++ 
b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/context/FailureCollectorWrapperTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap.context;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import io.cdap.cdap.etl.api.validation.ValidationException;
+import io.cdap.cdap.etl.api.validation.ValidationFailure;
+import java.util.ArrayList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test class for {@link FailureCollectorWrapper}. */
+@RunWith(JUnit4.class)
+public class FailureCollectorWrapperTest {
+
+  @Test
+  public void addFailure() {
+    /** arrange */
+    FailureCollectorWrapper failureCollectorWrapper = new 
FailureCollectorWrapper();
+
+    /** act */
+    RuntimeException error = new RuntimeException("An error has occurred");
+    failureCollectorWrapper.addFailure(error.getMessage(), null);
+
+    /** assert */
+    assertThrows(ValidationException.class, () -> 
failureCollectorWrapper.getOrThrowException());
+  }
+
+  @Test
+  public void getOrThrowException() {
+    /** arrange */
+    FailureCollectorWrapper failureCollectorWrapper = new 
FailureCollectorWrapper();
+    String errorMessage = "An error has occurred";
+    String expectedMessage = "Errors were encountered during validation. An 
error has occurred";
+
+    FailureCollectorWrapper emptyFailureCollectorWrapper = new 
FailureCollectorWrapper();
+
+    RuntimeException error = new RuntimeException(errorMessage);
+    failureCollectorWrapper.addFailure(error.getMessage(), null);
+
+    /** act && assert */
+    ValidationException e =
+        assertThrows(
+            ValidationException.class, () -> 
failureCollectorWrapper.getOrThrowException());
+    assertEquals(expectedMessage, e.getMessage());
+
+    // A case when return ValidationException with empty collector
+    ArrayList<ValidationFailure> exceptionCollector =
+        emptyFailureCollectorWrapper.getValidationFailures();
+    assertEquals(0, exceptionCollector.size());
+  }
+
+  @Test
+  public void getValidationFailures() {
+    /** arrange */
+    FailureCollectorWrapper failureCollectorWrapper = new 
FailureCollectorWrapper();
+    String errorMessage = "An error has occurred";
+
+    FailureCollectorWrapper emptyFailureCollectorWrapper = new 
FailureCollectorWrapper();
+
+    RuntimeException error = new RuntimeException(errorMessage);
+    failureCollectorWrapper.addFailure(error.getMessage(), null);
+
+    /** act */
+    ArrayList<ValidationFailure> exceptionCollector =
+        failureCollectorWrapper.getValidationFailures();
+    ArrayList<ValidationFailure> emptyExceptionCollector =
+        emptyFailureCollectorWrapper.getValidationFailures();
+
+    /** assert */
+    assertEquals(1, exceptionCollector.size());
+    assertEquals(errorMessage, exceptionCollector.get(0).getMessage());
+    assertEquals(0, emptyExceptionCollector.size());
+  }
+}

Reply via email to