github-advanced-security[bot] commented on code in PR #15755:
URL: https://github.com/apache/druid/pull/15755#discussion_r1466003412


##########
extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.error.InvalidInput;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Encodes the row and schema information from the Delta Lake.
+ */
+public class DeltaInputRow implements InputRow
+{
+  private final io.delta.kernel.data.Row row;
+  private final StructType schema;
+  private final Object2IntMap<String> fieldNameToOrdinal = new 
Object2IntOpenHashMap<>();
+  private final InputRow delegateRow;
+
+  private static final ZoneId ZONE_ID = ZoneId.systemDefault();
+
+  public DeltaInputRow(io.delta.kernel.data.Row row, InputRowSchema 
inputRowSchema)
+  {
+    this.row = row;
+    this.schema = row.getSchema();
+    List<String> fieldNames = this.schema.fieldNames();
+    for (int i = 0; i < fieldNames.size(); ++i) {
+      fieldNameToOrdinal.put(fieldNames.get(i), i);
+    }
+    fieldNameToOrdinal.defaultReturnValue(-1);
+
+    Map<String, Object> theMap = new HashMap<>();
+    for (String fieldName : fieldNames) {
+      theMap.put(fieldName, _getRaw(fieldName));
+    }
+    delegateRow = MapInputRowParser.parse(inputRowSchema, theMap);
+  }
+
+  @Override
+  public List<String> getDimensions()
+  {
+    return delegateRow.getDimensions();
+  }
+
+  @Override
+  public long getTimestampFromEpoch()
+  {
+    return delegateRow.getTimestampFromEpoch();
+  }
+
+  @Override
+  public DateTime getTimestamp()
+  {
+    return delegateRow.getTimestamp();
+  }
+
+  @Override
+  public List<String> getDimension(String dimension)
+  {
+    return delegateRow.getDimension(dimension);
+  }
+
+  @Nullable
+  @Override
+  public Object getRaw(String dimension)
+  {
+    return delegateRow.getRaw(dimension);
+  }
+
+  @Nullable
+  @Override
+  public Number getMetric(String metric)
+  {
+    return delegateRow.getMetric(metric);
+  }
+
+  @Override
+  public int compareTo(Row o)
+  {
+    return this.getTimestamp().compareTo(o.getTimestamp());
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DeltaInputRow{" +
+           "row=" + row +
+           ", schema=" + schema +
+           ", fieldNameToOrdinal=" + fieldNameToOrdinal +
+           ", delegateRow=" + delegateRow +
+           '}';
+  }
+
+  public Map<String, Object> getRawRowAsMap()
+  {
+    return RowSerde.convertRowToJsonObject(row);
+  }
+
+  @Nullable
+  private Object _getRaw(String dimension)
+  {
+    StructField field = schema.get(dimension);
+    if (field == null || field.isMetadataColumn()) {
+      return null;
+    }
+
+    int ordinal = fieldNameToOrdinal.getInt(dimension);
+    if (ordinal < 0) {
+      return null;
+    }
+    return getValue(field.getDataType(), row, ordinal);
+  }
+
+  @Nullable
+  private static Object getValue(DataType dataType, io.delta.kernel.data.Row 
dataRow, int columnOrdinal)
+  {
+    if (dataRow.isNullAt(columnOrdinal)) {

Review Comment:
   ## Chain of 'instanceof' tests
   
   This if block performs a chain of 12 type tests - consider alternatives, 
e.g. polymorphism or the visitor pattern.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/6509)



##########
extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.error.InvalidInput;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Encodes the row and schema information from the Delta Lake.
+ */
+public class DeltaInputRow implements InputRow

Review Comment:
   ## Inconsistent compareTo
   
   This class declares [compareTo](1) but inherits equals; the two could be 
inconsistent.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/6510)



##########
extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
+import io.delta.kernel.internal.types.TableSchemaSerDe;
+import io.delta.kernel.internal.util.VectorUtils;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import org.apache.druid.error.InvalidInput;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class to serialize and deserialize {@link Row} object.
+ * Code borrowed from <a 
href="https://github.com/delta-io/delta/blob/master/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/utils/RowSerDe.java";>
+ * RowSerde.java</a>.
+ *
+ */
+public class RowSerde
+{
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  private RowSerde()
+  {
+  }
+
+  /**
+   * Utility method to serialize a {@link Row} as a JSON string
+   */
+  public static String serializeRowToJson(Row row)
+  {
+    Map<String, Object> rowObject = convertRowToJsonObject(row);
+    try {
+      Map<String, Object> rowWithSchema = new HashMap<>();
+      rowWithSchema.put("schema", TableSchemaSerDe.toJson(row.getSchema()));
+      rowWithSchema.put("row", rowObject);
+      return OBJECT_MAPPER.writeValueAsString(rowWithSchema);
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Utility method to deserialize a {@link Row} object from the JSON form.
+   */
+  public static Row deserializeRowFromJson(TableClient tableClient, String 
jsonRowWithSchema)
+  {
+    try {
+      JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema);
+      JsonNode schemaNode = jsonNode.get("schema");
+      StructType schema =
+          TableSchemaSerDe.fromJson(tableClient.getJsonHandler(), 
schemaNode.asText());
+      return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), 
schema);
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Map<String, Object> convertRowToJsonObject(Row row)
+  {
+    StructType rowType = row.getSchema();
+    Map<String, Object> rowObject = new HashMap<>();
+    for (int fieldId = 0; fieldId < rowType.length(); fieldId++) {
+      StructField field = rowType.at(fieldId);
+      DataType fieldType = field.getDataType();
+      String name = field.getName();
+
+      if (row.isNullAt(fieldId)) {
+        rowObject.put(name, null);
+        continue;
+      }
+
+      Object value;
+      if (fieldType instanceof BooleanType) {

Review Comment:
   ## Chain of 'instanceof' tests
   
   This if block performs a chain of 13 type tests - consider alternatives, 
e.g. polymorphism or the visitor pattern.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/6508)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to