This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d15bdfcebd4 [HUDI-7937] Handle legacy writer requirements in
StreamSync and Clustering (#11534)
d15bdfcebd4 is described below
commit d15bdfcebd4e15308d5b79a2b90925004229aa4a
Author: Tim Brown <[email protected]>
AuthorDate: Tue Sep 17 09:29:16 2024 -0500
[HUDI-7937] Handle legacy writer requirements in StreamSync and Clustering
(#11534)
- Update StreamSync to infer whether row writer is allowed based on the
configs and target schema
- Update clustering to also infer whether row writer is allowed based on
the target schema and configs
- Fix clustering to add legacy writer parameter when required
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 -
.../MultipleSparkJobExecutionStrategy.java | 8 +-
.../storage/row/HoodieRowParquetWriteSupport.java | 2 +-
.../org/apache/spark/sql/HoodieDataTypeUtils.scala | 85 ++++++++++++---
.../apache/spark/sql/TestHoodieDataTypeUtils.java | 115 +++++++++++++++++++++
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 65 ++++++++++++
.../hudi/common/config/HoodieStorageConfig.java | 2 +-
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 28 +++++
.../testsuite/HoodieDeltaStreamerWrapper.java | 2 +-
.../main/java/org/apache/hudi/DataSourceUtils.java | 38 -------
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +-
.../java/org/apache/hudi/TestDataSourceUtils.java | 54 ----------
.../TestHoodieSparkMergeOnReadTableClustering.java | 4 +-
.../functional/TestSparkSortAndSizeClustering.java | 27 ++++-
.../org/apache/hudi/internal/DefaultSource.java | 5 +-
.../apache/hudi/spark3/internal/DefaultSource.java | 4 +-
.../apache/hudi/utilities/streamer/StreamSync.java | 80 ++++++++------
.../streamer/TestStreamSyncUnitTests.java | 10 +-
18 files changed, 373 insertions(+), 162 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 964e84a04bf..15c16078b0f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2149,10 +2149,6 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED);
}
- public String parquetWriteLegacyFormatEnabled() {
- return getString(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED);
- }
-
public String parquetOutputTimestampType() {
return getString(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 47ccd8700a8..57fc02960c2 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.HoodieDataTypeUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.sources.BaseRelation;
@@ -115,11 +116,16 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
Math.min(clusteringPlan.getInputGroups().size(),
writeConfig.getClusteringMaxParallelism()),
new CustomizedThreadFactory("clustering-job-group", true));
try {
+ boolean canUseRowWriter =
getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
true)
+ && HoodieDataTypeUtils.canUseRowWriter(schema,
engineContext.hadoopConfiguration());
+ if (canUseRowWriter) {
+
HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty(writeConfig.getProps(),
schema);
+ }
// execute clustering for each group async and collect WriteStatus
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> {
- if
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
true)) {
+ if (canUseRowWriter) {
return runClusteringForGroupAsyncAsRow(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index d05abb5a5a0..5e45be76d46 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -46,7 +46,7 @@ public class HoodieRowParquetWriteSupport extends
ParquetWriteSupport {
public HoodieRowParquetWriteSupport(Configuration conf, StructType
structType, Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {
Configuration hadoopConf = new Configuration(conf);
- hadoopConf.set("spark.sql.parquet.writeLegacyFormat",
config.getStringOrDefault(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED));
+ hadoopConf.set("spark.sql.parquet.writeLegacyFormat",
config.getStringOrDefault(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED,
"false"));
hadoopConf.set("spark.sql.parquet.outputTimestampType",
config.getStringOrDefault(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE));
hadoopConf.set("spark.sql.parquet.fieldId.write.enabled",
config.getStringOrDefault(PARQUET_FIELD_ID_WRITE_ENABLED));
setSchema(structType, hadoopConf);
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala
index 6017eca6739..00f4e8d7996 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieDataTypeUtils.scala
@@ -18,9 +18,17 @@
package org.apache.spark.sql
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.config.{HoodieStorageConfig, TypedProperties}
+import org.apache.parquet.avro.AvroWriteSupport
import org.apache.spark.sql.types._
+import org.slf4j.LoggerFactory
object HoodieDataTypeUtils {
+ private val log = LoggerFactory.getLogger(getClass)
/**
* Parses provided [[jsonSchema]] into [[StructType]].
@@ -30,26 +38,71 @@ object HoodieDataTypeUtils {
def parseStructTypeFromJson(jsonSchema: String): StructType =
StructType.fromString(jsonSchema)
+ def canUseRowWriter(schema: Schema, conf: Configuration): Boolean = {
+ if (conf.getBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, true)) {
+ // if we can write lists with the old list structure, we can use row
writer regardless of decimal precision
+ true
+ } else if (!HoodieAvroUtils.hasSmallPrecisionDecimalField(schema)) {
+ true
+ } else {
+ // small precision decimals require the legacy write mode but lists and
maps require the new write mode when
+ // WRITE_OLD_LIST_STRUCTURE is false so we can only use row writer if
one is present and the other is not
+ if (HoodieAvroUtils.hasListOrMapField(schema)) {
+ log.warn("Cannot use row writer due to presence of list or map with a
small precision decimal field")
+ false
+ } else {
+ true
+ }
+ }
+ }
+
/**
- * Checks whether provided {@link DataType} contains {@link DecimalType}
whose scale is less than
- * {@link Decimal# MAX_LONG_DIGITS ( )}
+ * Checks whether default value (false) of
"hoodie.parquet.writelegacyformat.enabled" should be
+ * overridden in case:
+ *
+ * <ul>
+ * <li>Property has not been explicitly set by the writer</li>
+ * <li>Data schema contains {@code DecimalType} that would be affected by
it</li>
+ * </ul>
+ *
+ * If both of the aforementioned conditions are true, will override the
default value of the config
+ * (by essentially setting the value) to make sure that the produced Parquet
data files could be
+ * read by {@code AvroParquetReader}
+ *
+ * @param properties properties specified by the writer
+ * @param schema schema of the dataset being written
*/
- def hasSmallPrecisionDecimalType(sparkType: DataType): Boolean = {
- sparkType match {
- case st: StructType =>
- st.exists(f => hasSmallPrecisionDecimalType(f.dataType))
-
- case map: MapType =>
- hasSmallPrecisionDecimalType(map.keyType) ||
- hasSmallPrecisionDecimalType(map.valueType)
-
- case at: ArrayType =>
- hasSmallPrecisionDecimalType(at.elementType)
+ def tryOverrideParquetWriteLegacyFormatProperty(properties:
java.util.Map[String, String], schema: StructType): Unit = {
+ tryOverrideParquetWriteLegacyFormatProperty(Left(properties),
+ AvroConversionUtils.convertStructTypeToAvroSchema(schema, "struct",
"hoodie.source"))
+ }
- case dt: DecimalType =>
- dt.precision < Decimal.MAX_LONG_DIGITS
+ def tryOverrideParquetWriteLegacyFormatProperty(properties: TypedProperties,
schema: Schema): Unit = {
+ tryOverrideParquetWriteLegacyFormatProperty(Right(properties), schema)
+ }
- case _ => false
+ private def tryOverrideParquetWriteLegacyFormatProperty(properties:
Either[java.util.Map[String, String], TypedProperties], schema: Schema): Unit =
{
+ val legacyFormatEnabledProp = properties match {
+ case Left(map) =>
map.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key)
+ case Right(typedProps) =>
typedProps.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key)
+ }
+ if (legacyFormatEnabledProp == null &&
HoodieAvroUtils.hasSmallPrecisionDecimalField(schema)) {
+ // ParquetWriteSupport writes DecimalType to parquet as INT32/INT64 when
the scale of decimalType
+ // is less than {@code Decimal.MAX_LONG_DIGITS}, but {@code
AvroParquetReader} which is used by
+ // {@code HoodieParquetReader} does not support DecimalType encoded as
INT32/INT64 as.
+ //
+ // To work this problem around we're checking whether
+ // - Schema contains any decimals that could be encoded as INT32/INT64
+ // - {@code HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED}
has not been explicitly
+ // set by the writer
+ //
+ // If both of these conditions are true, then we override the default
value of {@code
+ // HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} and set it
to "true"
+ log.warn("Small Decimal Type found in the persisted schema, reverting
default value of 'hoodie.parquet.writelegacyformat.enabled' to true")
+ properties match {
+ case Left(map) =>
map.put(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key, "true")
+ case Right(typedProps) =>
typedProps.put(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key,
"true")
+ }
}
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/sql/TestHoodieDataTypeUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/sql/TestHoodieDataTypeUtils.java
new file mode 100644
index 00000000000..285ccd78e11
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/spark/sql/TestHoodieDataTypeUtils.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql;
+
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroWriteSupport;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestHoodieDataTypeUtils {
+ private static final Schema SMALL_DECIMAL_SCHEMA = LogicalTypes.decimal(18,
10).addToSchema(Schema.createFixed("smallDec", null, "org.apache.hudi.test",
9));
+ private static final Schema LARGE_DECIMAL_SCHEMA = LogicalTypes.decimal(20,
10).addToSchema(Schema.createFixed("largeDec", null, "org.apache.hudi.test",
9));
+
+ private static Stream<Arguments> canUseRowWriterCases() {
+
+ Schema listSchema = Schema.createArray(Schema.create(Schema.Type.INT));
+ Schema mapSchema = Schema.createMap(Schema.create(Schema.Type.INT));
+
+ Schema schemaWithSmallDecimal =
Schema.createRecord("schemaWithSmallDecimal", null, null, false,
+ Collections.singletonList(new Schema.Field("smallDecimal",
SMALL_DECIMAL_SCHEMA, null, null)));
+ Schema schemaWithSmallDecimalAndList =
Schema.createRecord("schemaWithSmallDecimalAndList", null, null, false,
+ Arrays.asList(new Schema.Field("smallDecimal", SMALL_DECIMAL_SCHEMA,
null, null), new Schema.Field("intField", Schema.create(Schema.Type.INT), null,
null),
+ new Schema.Field("listField", listSchema, null, null)));
+ Schema schemaWithSmallDecimalAndMap =
Schema.createRecord("schemaWithSmallDecimalAndMap", null, null, false,
+ Arrays.asList(new Schema.Field("smallDecimal", SMALL_DECIMAL_SCHEMA,
null, null), new Schema.Field("intField", Schema.create(Schema.Type.INT), null,
null),
+ new Schema.Field("mapField", mapSchema,null, null)));
+ Schema schemaWithLargeDecimalAndList =
Schema.createRecord("schemaWithLargeDecimalAndList", null, null, false,
+ Arrays.asList(new Schema.Field("largeDecimal", LARGE_DECIMAL_SCHEMA,
null, null), new Schema.Field("intField", Schema.create(Schema.Type.INT), null,
null),
+ new Schema.Field("listField", listSchema, null, null)));
+ Schema schemaWithLargeDecimalAndMap =
Schema.createRecord("schemaWithLargeDecimalAndMap", null, null, false,
+ Arrays.asList(new Schema.Field("largeDecimal", LARGE_DECIMAL_SCHEMA,
null, null), new Schema.Field("intField", Schema.create(Schema.Type.INT), null,
null),
+ new Schema.Field("mapField", mapSchema, null, null)));
+ Schema schemaWithoutSpecialTypes = Schema.createRecord("schemaWithInt",
null, null, false,
+ Collections.singletonList(new Schema.Field("intField",
Schema.create(Schema.Type.INT), null, null)));
+
+ Configuration configurationWithLegacyListFormat = new Configuration(false);
+
configurationWithLegacyListFormat.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
"true");
+
+ Configuration configurationWithoutLegacyListFormat = new
Configuration(false);
+
configurationWithoutLegacyListFormat.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
"false");
+
+ return Stream.of(
+ Arguments.of(schemaWithoutSpecialTypes,
configurationWithoutLegacyListFormat, true),
+ Arguments.of(schemaWithSmallDecimal,
configurationWithoutLegacyListFormat, true),
+ Arguments.of(schemaWithLargeDecimalAndMap,
configurationWithoutLegacyListFormat, true),
+ Arguments.of(schemaWithSmallDecimalAndMap,
configurationWithoutLegacyListFormat, false),
+ Arguments.of(schemaWithLargeDecimalAndMap,
configurationWithoutLegacyListFormat, true),
+ Arguments.of(schemaWithSmallDecimalAndList,
configurationWithLegacyListFormat, true),
+ Arguments.of(schemaWithSmallDecimalAndList,
configurationWithoutLegacyListFormat, false),
+ Arguments.of(schemaWithLargeDecimalAndList,
configurationWithoutLegacyListFormat, true),
+ Arguments.of(schemaWithLargeDecimalAndList,
configurationWithLegacyListFormat, true));
+ }
+
+ @ParameterizedTest
+ @MethodSource("canUseRowWriterCases")
+ void testCanUseRowWriter(Schema schema, Configuration conf, boolean
expected) {
+ assertEquals(expected, HoodieDataTypeUtils.canUseRowWriter(schema, conf));
+ }
+
+ private static Stream<Arguments>
testAutoModifyParquetWriteLegacyFormatParameterParams() {
+ return Arrays.stream(new Object[][] {
+ {true, null, true}, {false, null, null},
+ {true, false, false}, {true, true, true},
+ {false, true, true}, {false, false, false}})
+ // add useMap option to test both entry-points
+ .map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("testAutoModifyParquetWriteLegacyFormatParameterParams")
+ void testAutoModifyParquetWriteLegacyFormatParameter(boolean smallDecimal,
Boolean propValue, Boolean expectedPropValue) {
+ Schema decimalType = smallDecimal ? SMALL_DECIMAL_SCHEMA :
LARGE_DECIMAL_SCHEMA;
+ Schema schema = Schema.createRecord("test", null, null, false,
+ Collections.singletonList(new Schema.Field("decimalField",
decimalType, null, null)));
+ TypedProperties options = propValue != null
+ ?
TypedProperties.fromMap(Collections.singletonMap(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(),
String.valueOf(propValue)))
+ : new TypedProperties();
+
+ HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty(options,
schema);
+ Boolean finalPropValue =
+
Option.ofNullable(options.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key()))
+ .map(value -> Boolean.parseBoolean(value.toString()))
+ .orElse(null);
+ assertEquals(expectedPropValue, finalPropValue);
+ }
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 078f72d6a9f..1b005091695 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -93,6 +93,7 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
+import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -1144,6 +1145,70 @@ public class HoodieAvroUtils {
throw new HoodieAvroSchemaException(String.format("cannot support rewrite
value for schema type: %s since the old schema type is: %s", newSchema,
oldSchema));
}
+ /**
+ * Checks whether the provided schema contains a decimal with a precision
less than or equal to 18,
+ * which allows the decimal to be stored as int/long instead of a fixed size
byte array in
+ * <a
href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md">parquet
logical types</a>
+ * @param schema the input schema to search
+ * @return true if the schema contains a small precision decimal field and
false otherwise
+ */
+ public static boolean hasSmallPrecisionDecimalField(Schema schema) {
+ return hasDecimalWithCondition(schema,
HoodieAvroUtils::isSmallPrecisionDecimalField);
+ }
+
+ private static boolean hasDecimalWithCondition(Schema schema,
Function<Decimal, Boolean> condition) {
+ switch (schema.getType()) {
+ case RECORD:
+ for (Field field : schema.getFields()) {
+ if (hasDecimalWithCondition(field.schema(), condition)) {
+ return true;
+ }
+ }
+ return false;
+ case ARRAY:
+ return hasDecimalWithCondition(schema.getElementType(), condition);
+ case MAP:
+ return hasDecimalWithCondition(schema.getValueType(), condition);
+ case UNION:
+ return hasDecimalWithCondition(getActualSchemaFromUnion(schema, null),
condition);
+ default:
+ if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+ Decimal decimal = (Decimal) schema.getLogicalType();
+ return condition.apply(decimal);
+ } else {
+ return false;
+ }
+ }
+ }
+
+ private static boolean isSmallPrecisionDecimalField(Decimal decimal) {
+ return decimal.getPrecision() <= 18;
+ }
+
+ /**
+ * Checks whether the provided schema contains a list or map field.
+ * @param schema input
+ * @return true if a list or map is present, false otherwise
+ */
+ public static boolean hasListOrMapField(Schema schema) {
+ switch (schema.getType()) {
+ case RECORD:
+ for (Field field : schema.getFields()) {
+ if (hasListOrMapField(field.schema())) {
+ return true;
+ }
+ }
+ return false;
+ case ARRAY:
+ case MAP:
+ return true;
+ case UNION:
+ return hasListOrMapField(getActualSchemaFromUnion(schema, null));
+ default:
+ return false;
+ }
+ }
+
/**
* Avro does not support type promotion from numbers to string. This
function returns true if
* it will be necessary to rewrite the record to support this promotion.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index 235754e624b..fcf3bb033d8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -129,7 +129,7 @@ public class HoodieStorageConfig extends HoodieConfig {
public static final ConfigProperty<String>
PARQUET_WRITE_LEGACY_FORMAT_ENABLED = ConfigProperty
.key("hoodie.parquet.writelegacyformat.enabled")
- .defaultValue("false")
+ .noDefaultValue()
.markAdvanced()
.withDocumentation("Sets spark.sql.parquet.writeLegacyFormat. If true,
data will be written in a way of Spark 1.4 and earlier. "
+ "For example, decimal values will be written in Parquet's
fixed-length byte array format which other systems such as Apache Hive and
Apache Impala use. "
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index f883870440f..776fd6bbf0b 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -82,6 +82,7 @@ import static
org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -731,4 +732,31 @@ public class TestHoodieAvroUtils {
assertArrayEquals(new Object[] {"partition1", "val1", 3.5},
sortColumnValues);
}
}
+
+ @Test
+ void testHasListOrMapField() {
+ Schema nestedList = Schema.createRecord("nestedList", null, null, false,
Arrays.asList(
+ new Schema.Field("intField", Schema.create(Schema.Type.INT), null,
null),
+ new Schema.Field("nested", Schema.createRecord("nestedSchema", null,
null, false, Collections.singletonList(
+ new Schema.Field("listField",
Schema.createArray(Schema.create(Schema.Type.INT)), null, null)
+ )), null, null)
+ ));
+ Schema nestedMap = Schema.createRecord("nestedMap", null, null, false,
Arrays.asList(
+ new Schema.Field("intField", Schema.create(Schema.Type.INT), null,
null),
+ new Schema.Field("nested",
Schema.createUnion(Schema.create(Schema.Type.NULL),
+ Schema.createRecord("nestedSchema", null, null, false,
+ Collections.singletonList(new Schema.Field("mapField",
Schema.createMap(Schema.create(Schema.Type.INT)), null, null)
+ ))), null, null)
+ ));
+ assertTrue(HoodieAvroUtils.hasListOrMapField(nestedList));
+ assertTrue(HoodieAvroUtils.hasListOrMapField(nestedMap));
+ assertFalse(HoodieAvroUtils.hasListOrMapField(new
Schema.Parser().parse(EXAMPLE_SCHEMA)));
+ }
+
+ @Test
+ void testHasSmallPrecisionDecimalField() {
+ assertTrue(HoodieAvroUtils.hasSmallPrecisionDecimalField(new
Schema.Parser().parse(SCHEMA_WITH_DECIMAL_FIELD)));
+ assertFalse(HoodieAvroUtils.hasSmallPrecisionDecimalField(new
Schema.Parser().parse(SCHEMA_WITH_AVRO_TYPES)));
+ assertFalse(HoodieAvroUtils.hasSmallPrecisionDecimalField(new
Schema.Parser().parse(EXAMPLE_SCHEMA)));
+ }
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 184f73402a3..81c6a3dc8d3 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -86,7 +86,7 @@ public class HoodieDeltaStreamerWrapper extends
HoodieDeltaStreamer {
.setBasePath(service.getCfg().targetBasePath)
.build();
String instantTime = InProcessTimeGenerator.createNewInstantTime();
- InputBatch inputBatch = service.readFromSource(instantTime, metaClient);
+ InputBatch inputBatch = service.readFromSource(instantTime,
metaClient).getLeft();
return Pair.of(inputBatch.getSchemaProvider(),
Pair.of(inputBatch.getCheckpointForNextBatch(), (JavaRDD<HoodieRecord>)
inputBatch.getBatch().get()));
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index c86e2c908c3..e794c723185 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -22,7 +22,6 @@ import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
@@ -48,9 +47,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.HoodieDataTypeUtils;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -303,39 +300,4 @@ public class DataSourceUtils {
HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
}
-
- /**
- * Checks whether default value (false) of
"hoodie.parquet.writelegacyformat.enabled" should be
- * overridden in case:
- *
- * <ul>
- * <li>Property has not been explicitly set by the writer</li>
- * <li>Data schema contains {@code DecimalType} that would be affected by
it</li>
- * </ul>
- *
- * If both of the aforementioned conditions are true, will override the
default value of the config
- * (by essentially setting the value) to make sure that the produced Parquet
data files could be
- * read by {@code AvroParquetReader}
- *
- * @param properties properties specified by the writer
- * @param schema schema of the dataset being written
- */
- public static void tryOverrideParquetWriteLegacyFormatProperty(Map<String,
String> properties, StructType schema) {
- if (HoodieDataTypeUtils.hasSmallPrecisionDecimalType(schema)
- &&
properties.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key())
== null) {
- // ParquetWriteSupport writes DecimalType to parquet as INT32/INT64 when
the scale of decimalType
- // is less than {@code Decimal.MAX_LONG_DIGITS}, but {@code
AvroParquetReader} which is used by
- // {@code HoodieParquetReader} does not support DecimalType encoded as
INT32/INT64 as.
- //
- // To work this problem around we're checking whether
- // - Schema contains any decimals that could be encoded as INT32/INT64
- // - {@code HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED}
has not been explicitly
- // set by the writer
- //
- // If both of these conditions are true, then we override the default
value of {@code
- // HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} and set it
to "true"
- LOG.warn("Small Decimal Type found in the persisted schema, reverting
default value of 'hoodie.parquet.writelegacyformat.enabled' to true");
-
properties.put(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(),
"true");
- }
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 351f68709ff..48d8dec474a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.shims.ShimLoader
import
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType,
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
import
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
-import
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams
@@ -68,6 +67,7 @@ import org.apache.hudi.sync.common.util.SyncUtilHelpers
import
org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException
import org.apache.hudi.util.SparkKeyGenUtils
import org.apache.spark.api.java.JavaSparkContext
+import
org.apache.spark.sql.HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index 6026a47c697..365386a9c8b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -20,7 +20,6 @@ package org.apache.hudi;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieRecord;
@@ -46,18 +45,9 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.DecimalType$;
-import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.types.StructType$;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
@@ -67,13 +57,9 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Stream;
-import static
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -90,9 +76,6 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class TestDataSourceUtils {
- private static final String HIVE_DATABASE = "testdb1";
- private static final String HIVE_TABLE = "hive_trips";
-
@Mock
private SparkRDDWriteClient hoodieWriteClient;
@@ -294,43 +277,6 @@ public class TestDataSourceUtils {
}
}
- @ParameterizedTest
- @MethodSource("testAutoModifyParquetWriteLegacyFormatParameterParams")
- public void testAutoModifyParquetWriteLegacyFormatParameter(boolean
smallDecimal, Boolean propValue, Boolean expectedPropValue) {
- DecimalType decimalType;
- if (smallDecimal) {
- decimalType = DecimalType$.MODULE$.apply(10, 2);
- } else {
- decimalType = DecimalType$.MODULE$.apply(38, 10);
- }
-
- StructType structType = StructType$.MODULE$.apply(
- Arrays.asList(
- StructField.apply("d1", decimalType, false, Metadata.empty())
- )
- );
-
- Map<String, String> options = propValue != null
- ?
Collections.singletonMap(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(),
String.valueOf(propValue))
- : new HashMap<>();
-
- tryOverrideParquetWriteLegacyFormatProperty(options, structType);
-
- Boolean finalPropValue =
-
Option.ofNullable(options.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key()))
- .map(Boolean::parseBoolean)
- .orElse(null);
- assertEquals(expectedPropValue, finalPropValue);
- }
-
- private static Stream<Arguments>
testAutoModifyParquetWriteLegacyFormatParameterParams() {
- return Arrays.stream(new Object[][] {
- {true, null, true}, {false, null, null},
- {true, false, false}, {true, true, true},
- {false, true, true}, {false, false, false}
- }).map(Arguments::of);
- }
-
@Test
public void testSerHoodieMetadataPayload() throws IOException {
String partitionPath = "2022/10/01";
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
index f56e0238f62..67b0b929d6c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
@@ -241,9 +241,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends
SparkClientFunctionalTes
HoodieWriteConfig cfg,
HoodieTestDataGenerator dataGen,
boolean clusteringAsRow) {
- if (clusteringAsRow) {
- client.getConfig().setValue(DataSourceWriteOptions.ENABLE_ROW_WRITER(),
"true");
- }
+ client.getConfig().setValue(DataSourceWriteOptions.ENABLE_ROW_WRITER(),
Boolean.toString(clusteringAsRow));
client.cluster(clusteringCommitTime, true);
metaClient = HoodieTableMetaClient.reload(metaClient);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
index e748c7ace53..47a496d1a78 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java
@@ -31,15 +31,19 @@ import
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -54,6 +58,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
public class TestSparkSortAndSizeClustering extends
HoodieSparkClientTestHarness {
@@ -115,17 +121,30 @@ public class TestSparkSortAndSizeClustering extends
HoodieSparkClientTestHarness
metaClient,
HoodieTimeline.getClusteringCommitRequestedInstant(clusteringTime)).map(Pair::getRight).get();
List<HoodieClusteringGroup> inputGroups = plan.getInputGroups();
- Assertions.assertEquals(1, inputGroups.size(), "Clustering plan will
contain 1 input group");
+ assertEquals(1, inputGroups.size(), "Clustering plan will contain 1 input
group");
Integer outputFileGroups =
plan.getInputGroups().get(0).getNumOutputFileGroups();
- Assertions.assertEquals(2, outputFileGroups, "Clustering plan will
generate 2 output groups");
+ assertEquals(2, outputFileGroups, "Clustering plan will generate 2 output
groups");
HoodieWriteMetadata writeMetadata = writeClient.cluster(clusteringTime,
true);
List<HoodieWriteStat> writeStats =
(List<HoodieWriteStat>)writeMetadata.getWriteStats().get();
- Assertions.assertEquals(2, writeStats.size(), "Clustering should write 2
files");
+ assertEquals(2, writeStats.size(), "Clustering should write 2 files");
List<Row> rows = readRecords();
- Assertions.assertEquals(numRecords, rows.size());
+ assertEquals(numRecords, rows.size());
+ validateDecimalTypeAfterClustering(writeStats);
+ }
+
+ // Validate that clustering produces decimals in legacy format
+ private void validateDecimalTypeAfterClustering(List<HoodieWriteStat>
writeStats) {
+ writeStats.stream().map(writeStat -> new
StoragePath(metaClient.getBasePath(), writeStat.getPath())).forEach(writtenPath
-> {
+ MessageType schema = ParquetUtils.readMetadata(storage, writtenPath)
+ .getFileMetaData().getSchema();
+ int index = schema.getFieldIndex("height");
+ Type decimalType = schema.getFields().get(index);
+ assertEquals("DECIMAL", decimalType.getOriginalType().toString());
+ assertEquals("FIXED_LEN_BYTE_ARRAY",
decimalType.asPrimitiveType().getPrimitiveTypeName().toString());
+ });
}
private List<WriteStatus> writeData(String commitTime, int totalRecords,
boolean doCommit) {
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
index 3b3b8eafb86..f46139c09c8 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
+++
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.spark.sql.HoodieDataTypeUtils;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -36,8 +37,6 @@ import org.apache.spark.sql.types.StructType;
import java.util.Map;
import java.util.Optional;
-import static
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty;
-
/**
* DataSource V2 implementation for managing internal write logic. Only called
internally.
*/
@@ -69,7 +68,7 @@ public class DefaultSource extends BaseDefaultSource
implements DataSourceV2,
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue());
Map<String, String> properties = options.asMap();
// Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
- tryOverrideParquetWriteLegacyFormatProperty(properties, schema);
+
HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty(properties,
schema);
// 1st arg to createHoodieConfig is not really required to be set. but
passing it anyways.
HoodieWriteConfig config =
DataSourceUtils.createHoodieConfig(options.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()).get(),
path, tblName, properties);
boolean arePartitionRecordsSorted =
HoodieInternalConfig.getBulkInsertIsPartitionRecordsSorted(
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
index 233fb20354d..01da1490cf4 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
@@ -35,8 +35,6 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import java.util.HashMap;
import java.util.Map;
-import static
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty;
-
/**
* DataSource V2 implementation for managing internal write logic. Only called
internally.
* This class is only compatible with datasource V2 API in Spark 3.
@@ -61,7 +59,7 @@ public class DefaultSource extends BaseDefaultSource
implements TableProvider {
// Create a new map as the properties is an unmodifiableMap on Spark 3.2.0
Map<String, String> newProps = new HashMap<>(properties);
// Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
- tryOverrideParquetWriteLegacyFormatProperty(newProps, schema);
+ HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty(newProps,
schema);
// 1st arg to createHoodieConfig is not really required to be set. but
passing it anyways.
HoodieWriteConfig config =
DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()),
path, tblName, newProps);
return new HoodieDataSourceInternalTable(instantTime, config, schema,
getSparkSession(),
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 43f227ac78a..a501e9605b8 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -112,6 +112,7 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.HoodieDataTypeUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
@@ -259,14 +260,12 @@ public class StreamSync implements Serializable,
Closeable {
private final boolean autoGenerateRecordKeys;
- private final boolean useRowWriter;
-
@VisibleForTesting
StreamSync(HoodieStreamer.Config cfg, SparkSession sparkSession,
TypedProperties props, HoodieSparkEngineContext
hoodieSparkContext, HoodieStorage storage, Configuration conf,
Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient, SchemaProvider userProvidedSchemaProvider,
Option<BaseErrorTableWriter> errorTableWriter,
SourceFormatAdapter formatAdapter, Option<Transformer> transformer,
- boolean useRowWriter, boolean autoGenerateRecordKeys) {
+ boolean autoGenerateRecordKeys) {
this.cfg = cfg;
this.hoodieSparkContext = hoodieSparkContext;
this.sparkSession = sparkSession;
@@ -282,7 +281,6 @@ public class StreamSync implements Serializable, Closeable {
this.errorTableWriter = errorTableWriter;
this.formatAdapter = formatAdapter;
this.transformer = transformer;
- this.useRowWriter = useRowWriter;
}
@Deprecated
@@ -327,9 +325,6 @@ public class StreamSync implements Serializable, Closeable {
Supplier<Option<Schema>> schemaSupplier = schemaProvider == null ?
Option::empty : () -> Option.ofNullable(schemaProvider.getSourceSchema());
this.transformer =
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames),
schemaSupplier, this.errorTableWriter.isPresent());
- // enable row writer only when operation is BULK_INSERT, and source is ROW
type and if row writer is not explicitly disabled.
- this.useRowWriter = this.cfg.operation == WriteOperationType.BULK_INSERT
&& source.getSourceType() == Source.SourceType.ROW
- &&
this.props.getBoolean(DataSourceWriteOptions.ENABLE_ROW_WRITER().key(), false);
}
/**
@@ -443,9 +438,11 @@ public class StreamSync implements Serializable, Closeable
{
.build();
String instantTime = metaClient.createNewInstantTime();
- InputBatch inputBatch = readFromSource(instantTime, metaClient);
+ Pair<InputBatch, Boolean> inputBatchAndUseRowWriter =
readFromSource(instantTime, metaClient);
- if (inputBatch != null) {
+ if (inputBatchAndUseRowWriter != null) {
+ InputBatch inputBatch = inputBatchAndUseRowWriter.getLeft();
+ boolean useRowWriter = inputBatchAndUseRowWriter.getRight();
// this is the first input batch. If schemaProvider not set, use it and
register Avro Schema and start
// compactor
@@ -489,7 +486,7 @@ public class StreamSync implements Serializable, Closeable {
}
}
- result = writeToSinkAndDoMetaSync(instantTime, inputBatch, metrics,
overallTimerContext);
+ result = writeToSinkAndDoMetaSync(instantTime, inputBatch, useRowWriter,
metrics, overallTimerContext);
}
// refresh schemas if need be before next batch
if (schemaProvider != null) {
@@ -518,10 +515,10 @@ public class StreamSync implements Serializable,
Closeable {
/**
* Read from Upstream Source and apply transformation if needed.
*
- * @return Pair<InputBatch and Boolean> Input data read from upstream
source, and boolean is true if empty.
+ * @return Pair<InputBatch and Boolean> Input data read from upstream
source, and boolean is true if the result should use the row writer path.
* @throws Exception in case of any Exception
*/
- public InputBatch readFromSource(String instantTime, HoodieTableMetaClient
metaClient) throws IOException {
+ public Pair<InputBatch, Boolean> readFromSource(String instantTime,
HoodieTableMetaClient metaClient) throws IOException {
// Retrieve the previous round checkpoints, if any
Option<String> resumeCheckpointStr = Option.empty();
if (commitsTimelineOpt.isPresent()) {
@@ -536,7 +533,7 @@ public class StreamSync implements Serializable, Closeable {
int maxRetryCount = cfg.retryOnSourceFailures ? cfg.maxRetryCount : 1;
int curRetryCount = 0;
- InputBatch sourceDataToSync = null;
+ Pair<InputBatch, Boolean> sourceDataToSync = null;
while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
try {
sourceDataToSync =
fetchFromSourceAndPrepareRecords(resumeCheckpointStr, instantTime, metaClient);
@@ -556,7 +553,7 @@ public class StreamSync implements Serializable, Closeable {
return sourceDataToSync;
}
- private InputBatch fetchFromSourceAndPrepareRecords(Option<String>
resumeCheckpointStr, String instantTime,
+ private Pair<InputBatch, Boolean>
fetchFromSourceAndPrepareRecords(Option<String> resumeCheckpointStr, String
instantTime,
HoodieTableMetaClient metaClient) {
hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Fetching
next batch: " + cfg.targetTableName);
HoodieRecordType recordType = createRecordMerger(props).getRecordType();
@@ -567,7 +564,9 @@ public class StreamSync implements Serializable, Closeable {
throw new UnsupportedOperationException("Spark record only support
parquet log.");
}
- InputBatch inputBatch = fetchNextBatchFromSource(resumeCheckpointStr,
metaClient);
+ Pair<InputBatch, Boolean> inputBatchAndRowWriterEnabled =
fetchNextBatchFromSource(resumeCheckpointStr, metaClient);
+ InputBatch inputBatch = inputBatchAndRowWriterEnabled.getLeft();
+ boolean useRowWriter = inputBatchAndRowWriterEnabled.getRight();
final String checkpointStr = inputBatch.getCheckpointForNextBatch();
final SchemaProvider schemaProvider = inputBatch.getSchemaProvider();
@@ -584,21 +583,35 @@ public class StreamSync implements Serializable,
Closeable {
hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking
if input is empty: " + cfg.targetTableName);
if (useRowWriter) { // no additional processing required for row writer.
- return inputBatch;
+ return Pair.of(inputBatch, true);
} else {
Option<JavaRDD<HoodieRecord>> recordsOpt =
HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(),
schemaProvider,
recordType, autoGenerateRecordKeys, instantTime, errorTableWriter);
- return new InputBatch(recordsOpt, checkpointStr, schemaProvider);
+ return Pair.of(new InputBatch(recordsOpt, checkpointStr,
schemaProvider), false);
}
}
+ @VisibleForTesting
+ boolean canUseRowWriter(Schema targetSchema) {
+ // enable row writer only when operation is BULK_INSERT, and source is ROW
type and if row writer is not explicitly disabled.
+ boolean rowWriterEnabled = isRowWriterEnabled();
+ return rowWriterEnabled && targetSchema != null &&
HoodieDataTypeUtils.canUseRowWriter(targetSchema, conf);
+ }
+
+ @VisibleForTesting
+ boolean isRowWriterEnabled() {
+ return cfg.operation == WriteOperationType.BULK_INSERT &&
formatAdapter.getSource().getSourceType() == Source.SourceType.ROW
+ && this.props.getBoolean("hoodie.streamer.write.row.writer.enable",
false);
+ }
+
/**
* Fetch data from source, apply transformations if any, align with schema
from schema provider if need be and return the input batch.
* @param resumeCheckpointStr checkpoint to resume from source.
- * @return {@link InputBatch} containing the new batch of data from source
along with new checkpoint and schema provider instance to use.
+ * @return Pair with {@link InputBatch} containing the new batch of data
from source along with new checkpoint and schema provider instance to use,
+ * and a boolean set to `true` if row writer can be used.
*/
@VisibleForTesting
- InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr,
HoodieTableMetaClient metaClient) {
+ Pair<InputBatch, Boolean> fetchNextBatchFromSource(Option<String>
resumeCheckpointStr, HoodieTableMetaClient metaClient) {
Option<JavaRDD<GenericRecord>> avroRDDOptional = null;
String checkpointStr = null;
SchemaProvider schemaProvider = null;
@@ -621,6 +634,7 @@ public class StreamSync implements Serializable, Closeable {
&& this.userProvidedSchemaProvider.getTargetSchema() !=
InputBatch.NULL_SCHEMA) {
// Let's deduce the schema provider for writer side first!
schemaProvider =
getDeducedSchemaProvider(this.userProvidedSchemaProvider.getTargetSchema(),
this.userProvidedSchemaProvider, metaClient);
+ boolean useRowWriter =
canUseRowWriter(schemaProvider.getTargetSchema());
if (useRowWriter) {
inputBatchForWriter = new InputBatch(transformed, checkpointStr,
schemaProvider);
} else {
@@ -657,7 +671,7 @@ public class StreamSync implements Serializable, Closeable {
.orElseGet(dataAndCheckpoint.getSchemaProvider()::getTargetSchema);
schemaProvider = getDeducedSchemaProvider(incomingSchema,
dataAndCheckpoint.getSchemaProvider(), metaClient);
- if (useRowWriter) {
+ if (canUseRowWriter(schemaProvider.getTargetSchema())) {
inputBatchForWriter = new InputBatch(transformed, checkpointStr,
schemaProvider);
} else {
// Rewrite transformed records into the expected target schema
@@ -666,11 +680,17 @@ public class StreamSync implements Serializable,
Closeable {
}
}
} else {
- if (useRowWriter) {
+ if (isRowWriterEnabled()) {
InputBatch inputBatchNeedsDeduceSchema =
formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
- inputBatchForWriter = new
InputBatch<>(inputBatchNeedsDeduceSchema.getBatch(),
inputBatchNeedsDeduceSchema.getCheckpointForNextBatch(),
-
getDeducedSchemaProvider(inputBatchNeedsDeduceSchema.getSchemaProvider().getTargetSchema(),
inputBatchNeedsDeduceSchema.getSchemaProvider(), metaClient));
- } else {
+ if
(canUseRowWriter(inputBatchNeedsDeduceSchema.getSchemaProvider().getTargetSchema()))
{
+ inputBatchForWriter = new
InputBatch<>(inputBatchNeedsDeduceSchema.getBatch(),
inputBatchNeedsDeduceSchema.getCheckpointForNextBatch(),
+
getDeducedSchemaProvider(inputBatchNeedsDeduceSchema.getSchemaProvider().getTargetSchema(),
inputBatchNeedsDeduceSchema.getSchemaProvider(), metaClient));
+ } else {
+ LOG.warn("Row-writer is enabled but cannot be used due to the target
schema");
+ }
+ }
+ // if row writer was enabled but the target schema prevents us from
using it, do not use the row writer
+ if (inputBatchForWriter == null) {
// Pull the data from the source & prepare the write
InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, cfg.sourceLimit);
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
@@ -694,10 +714,10 @@ public class StreamSync implements Serializable,
Closeable {
}
}
}
- if (useRowWriter) {
- return inputBatchForWriter;
+ if (inputBatchForWriter != null) {
+ return Pair.of(inputBatchForWriter, true);
} else {
- return new InputBatch(avroRDDOptional, checkpointStr, schemaProvider);
+ return Pair.of(new InputBatch(avroRDDOptional, checkpointStr,
schemaProvider), false);
}
}
@@ -829,16 +849,18 @@ public class StreamSync implements Serializable,
Closeable {
*
* @param instantTime instant time to use for ingest.
* @param inputBatch input batch that contains the records,
checkpoint, and schema provider
+ * @param useRowWriter whether to use row writer
* @param metrics Metrics
* @param overallTimerContext Timer Context
* @return Option Compaction instant if one is scheduled
*/
private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch,
+
boolean useRowWriter,
HoodieIngestionMetrics metrics,
Timer.Context overallTimerContext) {
Option<String> scheduledCompactionInstant = Option.empty();
// write to hudi and fetch result
- WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch,
instantTime);
+ WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch,
instantTime, useRowWriter);
JavaRDD<WriteStatus> writeStatusRDD =
writeClientWriteResult.getWriteStatusRDD();
Map<String, List<String>> partitionToReplacedFileIds =
writeClientWriteResult.getPartitionToReplacedFileIds();
@@ -961,7 +983,7 @@ public class StreamSync implements Serializable, Closeable {
throw lastException;
}
- private WriteClientWriteResult writeToSink(InputBatch inputBatch, String
instantTime) {
+ private WriteClientWriteResult writeToSink(InputBatch inputBatch, String
instantTime, boolean useRowWriter) {
WriteClientWriteResult writeClientWriteResult = null;
instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
index f429943532f..6eafe46baef 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncUnitTests.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
@@ -116,14 +117,17 @@ public class TestStreamSyncUnitTests {
//Actually create the deltastreamer
StreamSync streamSync = new StreamSync(cfg, sparkSession, propsSpy,
hoodieSparkEngineContext,
- storage, configuration, client -> true, schemaProvider,
errorTableWriterOption, sourceFormatAdapter, transformerOption, useRowWriter,
false);
+ storage, configuration, client -> true, schemaProvider,
errorTableWriterOption, sourceFormatAdapter, transformerOption, false);
StreamSync spy = spy(streamSync);
+ doReturn(useRowWriter).when(spy).canUseRowWriter(any());
+ doReturn(useRowWriter).when(spy).isRowWriterEnabled();
SchemaProvider deducedSchemaProvider;
deducedSchemaProvider = getSchemaProvider("deduced", false);
doReturn(deducedSchemaProvider).when(spy).getDeducedSchemaProvider(any(),
any(), any());
//run the method we are unit testing:
- InputBatch batch = spy.fetchNextBatchFromSource(Option.empty(),
mock(HoodieTableMetaClient.class));
+ Pair<InputBatch, Boolean> batchAndUseRowWriter =
spy.fetchNextBatchFromSource(Option.empty(), mock(HoodieTableMetaClient.class));
+ InputBatch batch = batchAndUseRowWriter.getLeft();
//make sure getDeducedSchemaProvider is always called once
verify(spy, times(1)).getDeducedSchemaProvider(any(), any(), any());
@@ -152,7 +156,7 @@ public class TestStreamSyncUnitTests {
when(commitsTimeline.lastInstant()).thenReturn(Option.of(hoodieInstant));
StreamSync streamSync = new StreamSync(cfg, sparkSession, props,
hoodieSparkEngineContext,
- storage, configuration, client -> true,
null,Option.empty(),null,Option.empty(),true,true);
+ storage, configuration, client -> true, null, Option.empty(), null,
Option.empty(), true);
StreamSync spy = spy(streamSync);
doReturn(Option.of(commitMetadata)).when(spy).getLatestCommitMetadataWithValidCheckpointInfo(any());