granthenke commented on a change in pull request #4053: NIFI-7142: Automatically handle schema drift in the PutKudu processor URL: https://github.com/apache/nifi/pull/4053#discussion_r379712766
########## File path: nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java ########## @@ -251,16 +267,50 @@ private void trigger(final ProcessContext context, final ProcessSession session, OperationType prevOperationType = OperationType.INSERT; final List<RowError> pendingRowErrors = new ArrayList<>(); for (FlowFile flowFile : flowFiles) { - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final OperationType operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue()); - final Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue()); - final Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue()); - try (final InputStream in = session.read(flowFile); final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) { + + final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile); + final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile)); + final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile)); + final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile)); + final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile)); + final RecordSet recordSet = recordReader.createRecordSet(); final List<String> fieldNames = recordReader.getSchema().getFieldNames(); - final KuduTable kuduTable = kuduClient.openTable(tableName); + KuduTable kuduTable = kuduClient.openTable(tableName); + + // If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them. + if (handleSchemaDrift) { + final Schema schema = kuduTable.getSchema(); + Stream<RecordField> fields = recordReader.getSchema().getFields().stream(); + List<RecordField> missing = fields.filter(field -> !schema.hasColumn( + lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName())) + .collect(Collectors.toList()); + if (!missing.isEmpty()) { + getLogger().info("adding {} columns to table '{}' to handle schema drift", + new Object[]{missing.size(), tableName}); + AlterTableOptions alter = new AlterTableOptions(); + for (RecordField field : missing) { + String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName(); + alter.addNullableColumn(columnName, toKuduType(field.getDataType())); + } + try { + kuduClient.alterTable(tableName, alter); + } catch (KuduException e) { + // Ignore the exception if the column already exists due to concurrent + // threads or applications attempting to handle schema drift. + if (e.getStatus().isAlreadyPresent()) { + getLogger().info("column already exists in table ' {}' while handling schema drift", Review comment: I had considered that, but decided against it in case there were any security concerns with that type of content being logged. This shouldn't be an issue which someone needs to track down and if it is, they should be able to debug it by cross referencing the fields and the table schema themselves. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services