alexeykudinkin commented on code in PR #5523:
URL: https://github.com/apache/hudi/pull/5523#discussion_r923729365
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -57,187 +92,153 @@ public int numFields() {
}
@Override
- public void setNullAt(int i) {
- if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- switch (i) {
- case 0: {
- this.commitTime = null;
- break;
- }
- case 1: {
- this.commitSeqNumber = null;
- break;
- }
- case 2: {
- this.recordKey = null;
- break;
- }
- case 3: {
- this.partitionPath = null;
- break;
- }
- case 4: {
- this.fileName = null;
- break;
- }
- default: throw new IllegalArgumentException("Not expected");
- }
+ public void setNullAt(int ordinal) {
+ if (ordinal < metaFields.length) {
+ metaFields[ordinal] = null;
} else {
- row.setNullAt(i);
+ row.setNullAt(rebaseOrdinal(ordinal));
}
}
@Override
- public void update(int i, Object value) {
- if (i < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- switch (i) {
- case 0: {
- this.commitTime = value.toString();
- break;
- }
- case 1: {
- this.commitSeqNumber = value.toString();
- break;
- }
- case 2: {
- this.recordKey = value.toString();
- break;
- }
- case 3: {
- this.partitionPath = value.toString();
- break;
- }
- case 4: {
- this.fileName = value.toString();
- break;
- }
- default: throw new IllegalArgumentException("Not expected");
+ public void update(int ordinal, Object value) {
+ if (ordinal < metaFields.length) {
+ if (value instanceof UTF8String) {
+ metaFields[ordinal] = (UTF8String) value;
+ } else if (value instanceof String) {
+ metaFields[ordinal] = UTF8String.fromString((String) value);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Could not update the row at (%d) with value of type
(%s), either UTF8String or String are expected", ordinal,
value.getClass().getSimpleName()));
}
} else {
- row.update(i, value);
+ row.update(rebaseOrdinal(ordinal), value);
}
}
- private String getMetaColumnVal(int ordinal) {
- switch (ordinal) {
- case 0: {
- return commitTime;
- }
- case 1: {
- return commitSeqNumber;
- }
- case 2: {
- return recordKey;
- }
- case 3: {
- return partitionPath;
- }
- case 4: {
- return fileName;
- }
- default: throw new IllegalArgumentException("Not expected");
+ @Override
+ public boolean isNullAt(int ordinal) {
+ if (ordinal < metaFields.length) {
+ return metaFields[ordinal] == null;
}
+ return row.isNullAt(rebaseOrdinal(ordinal));
}
@Override
- public boolean isNullAt(int ordinal) {
+ public UTF8String getUTF8String(int ordinal) {
+ if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+ return metaFields[ordinal];
+ }
+ return row.getUTF8String(rebaseOrdinal(ordinal));
+ }
+
+ @Override
+ public Object get(int ordinal, DataType dataType) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- return null == getMetaColumnVal(ordinal);
+ validateMetaFieldDataType(dataType);
+ return metaFields[ordinal];
}
- return row.isNullAt(ordinal);
+ return row.get(rebaseOrdinal(ordinal), dataType);
}
@Override
public boolean getBoolean(int ordinal) {
- return row.getBoolean(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Boolean.class);
+ return row.getBoolean(rebaseOrdinal(ordinal));
}
@Override
public byte getByte(int ordinal) {
- return row.getByte(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Byte.class);
+ return row.getByte(rebaseOrdinal(ordinal));
}
@Override
public short getShort(int ordinal) {
- return row.getShort(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Short.class);
+ return row.getShort(rebaseOrdinal(ordinal));
}
@Override
public int getInt(int ordinal) {
- return row.getInt(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Integer.class);
+ return row.getInt(rebaseOrdinal(ordinal));
}
@Override
public long getLong(int ordinal) {
- return row.getLong(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Long.class);
+ return row.getLong(rebaseOrdinal(ordinal));
}
@Override
public float getFloat(int ordinal) {
- return row.getFloat(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Float.class);
+ return row.getFloat(rebaseOrdinal(ordinal));
}
@Override
public double getDouble(int ordinal) {
- return row.getDouble(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Double.class);
+ return row.getDouble(rebaseOrdinal(ordinal));
}
@Override
public Decimal getDecimal(int ordinal, int precision, int scale) {
- return row.getDecimal(ordinal, precision, scale);
- }
-
- @Override
- public UTF8String getUTF8String(int ordinal) {
- if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
- }
- return row.getUTF8String(ordinal);
- }
-
- @Override
- public String getString(int ordinal) {
- if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- return new String(getMetaColumnVal(ordinal).getBytes());
- }
- return row.getString(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Decimal.class);
+ return row.getDecimal(rebaseOrdinal(ordinal), precision, scale);
}
@Override
public byte[] getBinary(int ordinal) {
- return row.getBinary(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, Byte[].class);
+ return row.getBinary(rebaseOrdinal(ordinal));
}
@Override
public CalendarInterval getInterval(int ordinal) {
- return row.getInterval(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, CalendarInterval.class);
+ return row.getInterval(rebaseOrdinal(ordinal));
}
@Override
public InternalRow getStruct(int ordinal, int numFields) {
- return row.getStruct(ordinal, numFields);
+ ruleOutMetaFieldsAccess(ordinal, InternalRow.class);
+ return row.getStruct(rebaseOrdinal(ordinal), numFields);
}
@Override
public ArrayData getArray(int ordinal) {
- return row.getArray(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, ArrayData.class);
+ return row.getArray(rebaseOrdinal(ordinal));
}
@Override
public MapData getMap(int ordinal) {
- return row.getMap(ordinal);
+ ruleOutMetaFieldsAccess(ordinal, MapData.class);
+ return row.getMap(rebaseOrdinal(ordinal));
}
@Override
- public Object get(int ordinal, DataType dataType) {
- if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
- return UTF8String.fromBytes(getMetaColumnVal(ordinal).getBytes());
+ public InternalRow copy() {
+ return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length),
row.copy(), containsMetaFields);
+ }
+
+ private int rebaseOrdinal(int ordinal) {
+ // NOTE: In cases when source row does not contain meta fields, we will
have to
+ // rebase ordinal onto its indexes
+ return containsMetaFields ? ordinal : ordinal - metaFields.length;
+ }
+
+ private void validateMetaFieldDataType(DataType dataType) {
+ if (!dataType.sameType(StringType$.MODULE$)) {
+ throw new ClassCastException(String.format("Can not cast meta-field of
type UTF8String to %s", dataType.simpleString()));
}
- return row.get(ordinal, dataType);
}
- @Override
- public InternalRow copy() {
- return new HoodieInternalRow(commitTime, commitSeqNumber, recordKey,
partitionPath, fileName, row.copy());
+ private void ruleOutMetaFieldsAccess(int ordinal, Class<?> expectedDataType)
{
Review Comment:
It's a good call out, we should be vigilant to make sure we're not tripping
into easily avoidable perf traps. In this case though, i don't think this
method will have much impact -- it's a single conditional w/ simple check that
will be inlined be the JVM (it's cost won't be much). Later on if we see it
showing up in the profiles, we can follow-up and make validations optional but
out the gate i wouldn't try to optimize it out since it's impact won't be
material.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieUnsafeRowUtils {
+
+ /**
+ * Fetches (nested) value w/in provided [[Row]] uniquely identified by the
provided nested-field path
+ * previously composed by [[composeNestedFieldPath]]
+ */
+ def getNestedRowValue(row: Row, nestedFieldPath: NestedFieldPath): Any = {
Review Comment:
This is a brand new code
##########
hudi-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java:
##########
@@ -52,7 +52,7 @@ protected KeyGenerator(TypedProperties config) {
* @return list of field names, when concatenated make up the record key.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public List<String> getRecordKeyFieldNames() {
Review Comment:
Good call, i think we can avoid making this breaking change
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/unsafe/UTF8StringBuilder.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.unsafe;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A helper class to write {@link UTF8String}s to an internal buffer and build
the concatenated
+ * {@link UTF8String} at the end.
+ */
+public class UTF8StringBuilder {
+
+ private static final int ARRAY_MAX =
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
+
+ private byte[] buffer;
+ private int cursor = Platform.BYTE_ARRAY_OFFSET;
+
+ public UTF8StringBuilder() {
Review Comment:
This is cloned from Spark
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java:
##########
@@ -46,16 +47,27 @@
* field in the partition path, use field1:simple 3. If you want your table to
be non partitioned, simply leave it as blank.
*
* RecordKey is internally generated using either SimpleKeyGenerator or
ComplexKeyGenerator.
+ *
+ * @deprecated
*/
+@Deprecated
Review Comment:
Good call! I think initially i've marked it for deprecation purely from the
standpoint that it's implementation isn't really viable (it initializes new
key-gen for every call to getRecordKey/getPartitionPath), but didn't offer
suggestion to replace it. Will address
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeRowUtils.scala:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+object HoodieUnsafeRowUtils {
+
+ /**
+ * Fetches (nested) value w/in provided [[Row]] uniquely identified by the
provided nested-field path
+ * previously composed by [[composeNestedFieldPath]]
+ */
+ def getNestedRowValue(row: Row, nestedFieldPath: NestedFieldPath): Any = {
+ var curRow = row
+ for (idx <- nestedFieldPath.parts.indices) {
+ val (ord, f) = nestedFieldPath.parts(idx)
+ if (curRow.isNullAt(ord)) {
+ // scalastyle:off return
+ if (f.nullable) return null
Review Comment:
Structurally they are very similar, goal here was to:
1. Clean up the code, make it more concise (it just needs to do a simple
iteration)
2. Converted to Scala to avoid cumbersome Java <> Scala interactions when
working w/ `Row`s
##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java:
##########
@@ -157,7 +152,29 @@ public List<HoodieInternalWriteStatus> getWriteStatuses()
throws IOException {
return writeStatusList;
}
- public void abort() {
+ public void abort() {}
+
+ public void close() throws IOException {
+ for (HoodieRowCreateHandle rowCreateHandle : handles.values()) {
+ writeStatusList.add(rowCreateHandle.close());
+ }
+ handles.clear();
+ handle = null;
+ }
+
+ private UTF8String extractPartitionPath(InternalRow row) {
+ if (populateMetaFields) {
+ // In case meta-fields are materialized w/in the table itself, we can
just simply extract
+ // partition path from there
+ //
+ // NOTE: Helper keeps track of [[lastKnownPartitionPath]] as
[[UTF8String]] to avoid
+ // conversion from Catalyst internal representation into a
[[String]]
+ return row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS);
+ } else if (keyGeneratorOpt.isPresent()) {
+ return keyGeneratorOpt.get().getPartitionPath(row, structType);
Review Comment:
Didn't miss it, it's actually one of the primary premises of this PR to get
red of such special handling: special handling duplicates the logic across many
places (key-gen, bulk-insert handler, etc), which surely enough leads to bugs
induced by code getting out of sync (for ex, broken hive-style
partitioning/encoding in bulk-insert path, etc).
Keys/partition-path should be produced by a single SoT which in this case is
the KG, and it should be done in a performant (ie engine-specific) way, and
that's exactly what this PR tackles.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/unsafe/UTF8StringBuilder.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.unsafe;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A helper class to write {@link UTF8String}s to an internal buffer and build
the concatenated
+ * {@link UTF8String} at the end.
+ */
+public class UTF8StringBuilder {
+
+ private static final int ARRAY_MAX =
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
+
+ private byte[] buffer;
+ private int cursor = Platform.BYTE_ARRAY_OFFSET;
+
+ public UTF8StringBuilder() {
Review Comment:
And it's covered by existing tests, though we don't have dedicated UTs for it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]