This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2ada2ef [HUDI-902] Avoid exception when getSchemaProvider (#1584)
2ada2ef is described below
commit 2ada2ef50fc373ed3083d0e7a96e5e644be52bfb
Author: Raymond Xu <[email protected]>
AuthorDate: Fri May 15 21:33:02 2020 -0700
[HUDI-902] Avoid exception when getSchemaProvider (#1584)
* When no new input data, don't throw exception for null SchemaProvider
* Return the newly added NullSchemaProvider instead
---
.../apache/hudi/utilities/sources/InputBatch.java | 24 ++++++++++++--
.../hudi/utilities/sources/TestInputBatch.java | 37 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 2 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
index dcf56f3..f752e0d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java
@@ -18,10 +18,14 @@
package org.apache.hudi.utilities.sources;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaSparkContext;
+
public class InputBatch<T> {
private final Option<T> batch;
@@ -49,9 +53,25 @@ public class InputBatch<T> {
}
public SchemaProvider getSchemaProvider() {
- if (schemaProvider == null) {
+ if (batch.isPresent() && schemaProvider == null) {
throw new HoodieException("Please provide a valid schema provider
class!");
}
- return schemaProvider;
+ return Option.ofNullable(schemaProvider).orElse(new NullSchemaProvider());
+ }
+
+ public static class NullSchemaProvider extends SchemaProvider {
+
+ public NullSchemaProvider() {
+ this(null, null);
+ }
+
+ public NullSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
+ super(props, jssc);
+ }
+
+ @Override
+ public Schema getSourceSchema() {
+ return Schema.create(Schema.Type.NULL);
+ }
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
new file mode 100644
index 0000000..752621d
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestInputBatch.java
@@ -0,0 +1,37 @@
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestInputBatch {
+
+ @Test
+ public void getSchemaProviderShouldThrowException() {
+ final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"),
null, null);
+ Throwable t = assertThrows(HoodieException.class,
inputBatch::getSchemaProvider);
+ assertEquals("Please provide a valid schema provider class!",
t.getMessage());
+ }
+
+ @Test
+ public void getSchemaProviderShouldReturnNullSchemaProvider() {
+ final InputBatch<String> inputBatch = new InputBatch<>(Option.empty(),
null, null);
+ SchemaProvider schemaProvider = inputBatch.getSchemaProvider();
+ assertTrue(schemaProvider instanceof InputBatch.NullSchemaProvider);
+ }
+
+ @Test
+ public void getSchemaProviderShouldReturnGivenSchemaProvider() {
+ SchemaProvider schemaProvider = new RowBasedSchemaProvider(null);
+ final InputBatch<String> inputBatch = new InputBatch<>(Option.of("foo"),
null, schemaProvider);
+ assertSame(schemaProvider, inputBatch.getSchemaProvider());
+ }
+}