granthenke commented on a change in pull request #3387: NIFI-6009 ScanKudu 
Processor & KuduPut Processor Delete Operation
URL: https://github.com/apache/nifi/pull/3387#discussion_r273570041
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/kudu/PutKudu.java
 ##########
 @@ -403,92 +354,21 @@ protected KuduSession getKuduSession(final KuduClient 
client) {
         return kuduSession;
     }
 
-    private void flushKuduSession(final KuduSession kuduSession, boolean 
close, final List<RowError> rowErrors) throws KuduException {
-        final List<OperationResponse> responses = close ? kuduSession.close() 
: kuduSession.flush();
-
-        if (kuduSession.getFlushMode() == 
SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
-            
rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
-        } else {
-            responses.stream()
-                .filter(OperationResponse::hasRowError)
-                .map(OperationResponse::getRowError)
-                .forEach(rowErrors::add);
-        }
-    }
-
-
-
-    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames) {
+    protected Upsert upsertRecordToKudu(KuduClientService clientService, 
KuduTable kuduTable, Record record, List<String> fieldNames) {
         Upsert upsert = kuduTable.newUpsert();
-        this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, 
fieldNames);
+        clientService.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), 
record, fieldNames);
         return upsert;
     }
 
-    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, 
List<String> fieldNames) {
+    protected Insert insertRecordToKudu(KuduClientService clientService, 
KuduTable kuduTable, Record record, List<String> fieldNames) {
         Insert insert = kuduTable.newInsert();
-        this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, 
fieldNames);
+        clientService.buildPartialRow(kuduTable.getSchema(), insert.getRow(), 
record, fieldNames);
         return insert;
     }
 
-    @VisibleForTesting
-    void buildPartialRow(Schema schema, PartialRow row, Record record, 
List<String> fieldNames) {
-        for (String colName : fieldNames) {
-            int colIdx = this.getColumnIndex(schema, colName);
-            if (colIdx != -1) {
-                ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
-                Type colType = colSchema.getType();
-
-                if (record.getValue(colName) == null) {
-                    row.setNull(colName);
-                    continue;
-                }
-
-                switch (colType.getDataType(colSchema.getTypeAttributes())) {
-                    case BOOL:
-                        row.addBoolean(colIdx, record.getAsBoolean(colName));
-                        break;
-                    case FLOAT:
-                        row.addFloat(colIdx, record.getAsFloat(colName));
-                        break;
-                    case DOUBLE:
-                        row.addDouble(colIdx, record.getAsDouble(colName));
-                        break;
-                    case BINARY:
-                        row.addBinary(colIdx, 
record.getAsString(colName).getBytes());
-                        break;
-                    case INT8:
-                        row.addByte(colIdx, 
record.getAsInt(colName).byteValue());
-                        break;
-                    case INT16:
-                        row.addShort(colIdx, 
record.getAsInt(colName).shortValue());
-                        break;
-                    case INT32:
-                        row.addInt(colIdx, record.getAsInt(colName));
-                        break;
-                    case INT64:
-                    case UNIXTIME_MICROS:
-                        row.addLong(colIdx, record.getAsLong(colName));
-                        break;
-                    case STRING:
-                        row.addString(colIdx, record.getAsString(colName));
-                        break;
-                    case DECIMAL32:
-                    case DECIMAL64:
-                    case DECIMAL128:
-                        row.addDecimal(colIdx, new 
BigDecimal(record.getAsString(colName)));
-                        break;
-                    default:
-                        throw new IllegalStateException(String.format("unknown 
column type %s", colType));
-                }
-            }
-        }
-    }
-
-    private int getColumnIndex(Schema columns, String colName) {
-        try {
-            return columns.getColumnIndex(colName);
-        } catch (Exception ex) {
-            return -1;
-        }
+    protected Delete deleteRecordToKudu(KuduClientService clientService, 
KuduTable kuduTable, Record record, List<String> fieldNames) {
 
 Review comment:
   It's worth noting that until Kudu 1.10 releases, delete operations can only 
have the key columns set, if any additional column is set the operation will be 
failed. 
   
   This is the relevant commit: 
   
https://github.com/apache/kudu/commit/17fb5a10319d2bc19e8e54aa9c088335b04926e9#diff-7e3732d5ab4a5b01ff1955907bfc82f8
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to