KAFKA-4173; SchemaProjector should successfully project missing Struct field 
when target field is optional

Author: Shikhar Bhushan <shik...@confluent.io>

Reviewers: Konstantine Karantasis <konstant...@confluent.io>, Jason Gustafson 
<ja...@confluent.io>

Closes #1865 from shikhar/kafka-4173


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fbcff9b5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fbcff9b5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fbcff9b5

Branch: refs/heads/0.10.0
Commit: fbcff9b50788497ca548fdfb44c7dc1eb53af1ee
Parents: 1e5bf02
Author: Shikhar Bhushan <shik...@confluent.io>
Authored: Fri Sep 16 15:54:33 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Sep 19 13:33:47 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/data/SchemaProjector.java     | 12 +++++------
 .../kafka/connect/data/SchemaProjectorTest.java | 21 ++++++++++++++++++++
 2 files changed, 26 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fbcff9b5/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
index ad0caf8..6277e44 100644
--- 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
@@ -111,14 +111,12 @@ public class SchemaProjector {
                 } catch (SchemaProjectorException e) {
                     throw new SchemaProjectorException("Error projecting " + 
sourceField.name(), e);
                 }
+            } else if (targetField.schema().isOptional()) {
+                // Ignore missing field
+            } else if (targetField.schema().defaultValue() != null) {
+                targetStruct.put(fieldName, 
targetField.schema().defaultValue());
             } else {
-                Object targetDefault;
-                if (targetField.schema().defaultValue() != null) {
-                    targetDefault = targetField.schema().defaultValue();
-                } else {
-                    throw new SchemaProjectorException("Cannot project " + 
source.schema() + " to " + target.schema());
-                }
-                targetStruct.put(fieldName, targetDefault);
+                throw new SchemaProjectorException("Required field `" +  
fieldName + "` is missing from source schema: " + source);
             }
         }
         return targetStruct;

http://git-wip-us.apache.org/repos/asf/kafka/blob/fbcff9b5/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
index 0b1760b..101be04 100644
--- 
a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
@@ -469,6 +469,27 @@ public class SchemaProjectorTest {
         }
     }
 
+    @Test
+    public void testProjectMissingDefaultValuedStructField() {
+        final Schema source = SchemaBuilder.struct().build();
+        final Schema target = SchemaBuilder.struct().field("id", 
SchemaBuilder.int64().defaultValue(42L).build()).build();
+        assertEquals(42L, (long) ((Struct) SchemaProjector.project(source, new 
Struct(source), target)).getInt64("id"));
+    }
+
+    @Test
+    public void testProjectMissingOptionalStructField() {
+        final Schema source = SchemaBuilder.struct().build();
+        final Schema target = SchemaBuilder.struct().field("id", 
SchemaBuilder.OPTIONAL_INT64_SCHEMA).build();
+        assertEquals(null, ((Struct) SchemaProjector.project(source, new 
Struct(source), target)).getInt64("id"));
+    }
+
+    @Test(expected = SchemaProjectorException.class)
+    public void testProjectMissingRequiredField() {
+        final Schema source = SchemaBuilder.struct().build();
+        final Schema target = SchemaBuilder.struct().field("id", 
SchemaBuilder.INT64_SCHEMA).build();
+        SchemaProjector.project(source, new Struct(source), target);
+    }
+
     private void verifyOptionalProjection(Schema source, Type targetType, 
Object value, Object defaultValue, Object expectedProjected, boolean optional) {
         Schema target;
         assert source.isOptional();

Reply via email to