pvillard31 commented on a change in pull request #4053: NIFI-7142: 
Automatically handle schema drift in the PutKudu processor
URL: https://github.com/apache/nifi/pull/4053#discussion_r379702363
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ##########
 @@ -251,16 +267,50 @@ private void trigger(final ProcessContext context, final 
ProcessSession session,
         OperationType prevOperationType = OperationType.INSERT;
         final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
-            final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            final OperationType operationType = 
OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean ignoreNull = 
Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
-            final Boolean lowercaseFields = 
Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
-
             try (final InputStream in = session.read(flowFile);
                 final RecordReader recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+
+                final String tableName = getEvaluatedProperty(TABLE_NAME, 
context, flowFile);
+                final OperationType operationType = 
OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, 
flowFile));
+                final Boolean ignoreNull = 
Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
+                final Boolean lowercaseFields = 
Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
+                final Boolean handleSchemaDrift = 
Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+
                 final RecordSet recordSet = recordReader.createRecordSet();
                 final List<String> fieldNames = 
recordReader.getSchema().getFieldNames();
-                final KuduTable kuduTable = kuduClient.openTable(tableName);
+                KuduTable kuduTable = kuduClient.openTable(tableName);
+
+                // If handleSchemaDrift is true, check for any missing columns 
and alter the Kudu table to add them.
+                if (handleSchemaDrift) {
+                    final Schema schema = kuduTable.getSchema();
+                    Stream<RecordField> fields = 
recordReader.getSchema().getFields().stream();
+                    List<RecordField> missing = fields.filter(field -> 
!schema.hasColumn(
+                            lowercaseFields ? 
field.getFieldName().toLowerCase() : field.getFieldName()))
+                            .collect(Collectors.toList());
+                    if (!missing.isEmpty()) {
+                        getLogger().info("adding {} columns to table '{}' to 
handle schema drift",
+                                new Object[]{missing.size(), tableName});
+                        AlterTableOptions alter = new AlterTableOptions();
+                        for (RecordField field : missing) {
+                            String columnName = lowercaseFields ? 
field.getFieldName().toLowerCase() : field.getFieldName();
+                            alter.addNullableColumn(columnName, 
toKuduType(field.getDataType()));
+                        }
+                        try {
+                            kuduClient.alterTable(tableName, alter);
+                        } catch (KuduException e) {
 
 Review comment:
   What happens if table is with columns {a, b}
   Then flow file FF1 comes with {a, b, c}
   And another FF2 comes with {a, b, c, d}
   And there is multi-threading.
   So FF1 got processed and added column c to the table. But FF2 started being 
process with only {a, b} as schema. So the ``alter`` object will contain the 
addition of {c, d}. When the exception is thrown because c already exists, did 
it add the column d, or not?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to