This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e04dc0951cf [HUDI-5315] Use sample writes to estimate record size
(#8390)
e04dc0951cf is described below
commit e04dc0951cf21122f0d3dd4f673b87b663253109
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu May 4 19:05:13 2023 +0800
[HUDI-5315] Use sample writes to estimate record size (#8390)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 3 +-
.../hudi/common/table/HoodieTableMetaClient.java | 1 +
.../config/HoodieDeltaStreamerConfig.java | 13 ++
.../hudi/utilities/deltastreamer/DeltaSync.java | 37 ++---
.../deltastreamer/SparkSampleWritesUtils.java | 159 +++++++++++++++++++++
.../deltastreamer/TestSparkSampleWritesUtils.java | 106 ++++++++++++++
6 files changed, 302 insertions(+), 17 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 50ba109c0de..b38ceeeea98 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -102,6 +102,7 @@ import java.util.stream.Collectors;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;
+import static
org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
import static
org.apache.hudi.table.marker.ConflictDetectionUtils.getDefaultEarlyConflictDetectionStrategy;
/**
@@ -1453,7 +1454,7 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public int getCopyOnWriteRecordSizeEstimate() {
- return getInt(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE);
+ return getInt(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE);
}
public boolean allowMultipleCleans() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 7d248dc182b..87bdd09db10 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -86,6 +86,7 @@ public class HoodieTableMetaClient implements Serializable {
public static final String TEMPFOLDER_NAME = METAFOLDER_NAME +
Path.SEPARATOR + ".temp";
public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME +
Path.SEPARATOR + ".aux";
public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH =
AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap";
+ public static final String SAMPLE_WRITES_FOLDER_PATH = AUXILIARYFOLDER_NAME
+ Path.SEPARATOR + ".sample_writes";
public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME +
Path.SEPARATOR + ".heartbeat";
public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME +
Path.SEPARATOR + "metadata";
public static final String HASHING_METADATA_FOLDER_NAME = ".bucket_index" +
Path.SEPARATOR + "consistent_hashing_metadata";
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java
index 86d154b50ed..2401a2bf3b1 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java
@@ -26,6 +26,8 @@ import org.apache.hudi.common.config.HoodieConfig;
import javax.annotation.concurrent.Immutable;
+import static
org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+
/**
* Delta Streamer related config.
*/
@@ -93,4 +95,15 @@ public class HoodieDeltaStreamerConfig extends HoodieConfig {
.withDocumentation("The path to which a particular table is ingested.
The config is specific to HoodieMultiTableDeltaStreamer"
+ " and overrides path determined using option `--base-path-prefix`
for a table. This config is ignored for a single"
+ " table deltastreamer");
+ public static final ConfigProperty<Boolean> SAMPLE_WRITES_ENABLED =
ConfigProperty
+ .key(DELTA_STREAMER_CONFIG_PREFIX + "sample.writes.enabled")
+ .defaultValue(false)
+ .withDocumentation("Set this to true to sample from the first batch of
records and write to the auxiliary path, before writing to the table."
+ + "The sampled records are used to calculate the average record
size. The relevant write client will have `" +
COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key()
+ + "` being overwritten by the calculated result.");
+ public static final ConfigProperty<Integer> SAMPLE_WRITES_SIZE =
ConfigProperty
+ .key(DELTA_STREAMER_CONFIG_PREFIX + "sample.writes.size")
+ .defaultValue(5000)
+ .withDocumentation("Number of records to sample from the first write. To
improve the estimation's accuracy, "
+ + "for smaller or more compressable record size, set the sample size
bigger. For bigger or less compressable record size, set smaller.");
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 8e2b03c7849..daae0a385c6 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -148,10 +148,10 @@ import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
import static
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
import static
org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
+import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_FORCE_SKIP_PROP;
import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
-import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DEFAULT_CHECKPOINT_FORCE_SKIP_PROP;
import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
-import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_FORCE_SKIP_PROP;
+import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DEFAULT_CHECKPOINT_FORCE_SKIP_PROP;
import static
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
@@ -398,13 +398,14 @@ public class DeltaSync implements Serializable, Closeable
{
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>>
srcRecordsWithCkpt = readFromSource(commitsTimelineOpt);
- if (null != srcRecordsWithCkpt) {
+ if (srcRecordsWithCkpt != null) {
+ final JavaRDD<HoodieRecord> recordsFromSource =
srcRecordsWithCkpt.getRight().getRight();
// this is the first input batch. If schemaProvider not set, use it and
register Avro Schema and start
// compactor
- if (null == writeClient) {
+ if (writeClient == null) {
this.schemaProvider = srcRecordsWithCkpt.getKey();
// Setup HoodieWriteClient and compaction now that we decided on schema
- setupWriteClient();
+ setupWriteClient(recordsFromSource);
} else {
Schema newSourceSchema = srcRecordsWithCkpt.getKey().getSourceSchema();
Schema newTargetSchema = srcRecordsWithCkpt.getKey().getTargetSchema();
@@ -413,7 +414,7 @@ public class DeltaSync implements Serializable, Closeable {
LOG.info("Seeing new schema. Source :" +
newSourceSchema.toString(true)
+ ", Target :" + newTargetSchema.toString(true));
// We need to recreate write client with new schema and register
them.
- reInitWriteClient(newSourceSchema, newTargetSchema);
+ reInitWriteClient(newSourceSchema, newTargetSchema,
recordsFromSource);
processedSchema.addSchema(newSourceSchema);
processedSchema.addSchema(newTargetSchema);
}
@@ -427,7 +428,7 @@ public class DeltaSync implements Serializable, Closeable {
}
}
- result = writeToSink(srcRecordsWithCkpt.getRight().getRight(),
+ result = writeToSink(recordsFromSource,
srcRecordsWithCkpt.getRight().getLeft(), metrics,
overallTimerContext);
}
@@ -944,34 +945,38 @@ public class DeltaSync implements Serializable, Closeable
{
* SchemaProvider creation is a precursor to HoodieWriteClient and
AsyncCompactor creation. This method takes care of
* this constraint.
*/
- public void setupWriteClient() throws IOException {
+ private void setupWriteClient(JavaRDD<HoodieRecord> records) throws
IOException {
if ((null != schemaProvider)) {
Schema sourceSchema = schemaProvider.getSourceSchema();
Schema targetSchema = schemaProvider.getTargetSchema();
- reInitWriteClient(sourceSchema, targetSchema);
+ reInitWriteClient(sourceSchema, targetSchema, records);
}
}
- private void reInitWriteClient(Schema sourceSchema, Schema targetSchema)
throws IOException {
+ private void reInitWriteClient(Schema sourceSchema, Schema targetSchema,
JavaRDD<HoodieRecord> records) throws IOException {
LOG.info("Setting up new Hoodie Write Client");
if (isDropPartitionColumns()) {
targetSchema = HoodieAvroUtils.removeFields(targetSchema,
getPartitionColumns(keyGenerator, props));
}
registerAvroSchemas(sourceSchema, targetSchema);
- HoodieWriteConfig hoodieCfg = getHoodieClientConfig(targetSchema);
- if (hoodieCfg.isEmbeddedTimelineServerEnabled()) {
+ final HoodieWriteConfig initialWriteConfig =
getHoodieClientConfig(targetSchema);
+ final HoodieWriteConfig writeConfig = SparkSampleWritesUtils
+ .getWriteConfigWithRecordSizeEstimate(jssc, records,
initialWriteConfig)
+ .orElse(initialWriteConfig);
+
+ if (writeConfig.isEmbeddedTimelineServerEnabled()) {
if (!embeddedTimelineService.isPresent()) {
- embeddedTimelineService =
EmbeddedTimelineServerHelper.createEmbeddedTimelineService(new
HoodieSparkEngineContext(jssc), hoodieCfg);
+ embeddedTimelineService =
EmbeddedTimelineServerHelper.createEmbeddedTimelineService(new
HoodieSparkEngineContext(jssc), writeConfig);
} else {
-
EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(embeddedTimelineService.get(),
hoodieCfg);
+
EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(embeddedTimelineService.get(),
writeConfig);
}
}
- if (null != writeClient) {
+ if (writeClient != null) {
// Close Write client.
writeClient.close();
}
- writeClient = new SparkRDDWriteClient<>(new
HoodieSparkEngineContext(jssc), hoodieCfg, embeddedTimelineService);
+ writeClient = new SparkRDDWriteClient<>(new
HoodieSparkEngineContext(jssc), writeConfig, embeddedTimelineService);
onInitializingHoodieWriteClient.apply(writeClient);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SparkSampleWritesUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SparkSampleWritesUtils.java
new file mode 100644
index 00000000000..533a9680303
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SparkSampleWritesUtils.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.List;
+
+import static
org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.getInstantFromTemporalAccessor;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static
org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SAMPLE_WRITES_ENABLED;
+import static
org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by
writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading
the commit metadata.
+ * <p>
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert
overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+ public static Option<HoodieWriteConfig>
getWriteConfigWithRecordSizeEstimate(JavaSparkContext jsc,
JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig) {
+ if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+ LOG.debug("Skip overwriting record size estimate as it's disabled.");
+ return Option.empty();
+ }
+ HoodieTableMetaClient metaClient = getMetaClient(jsc,
writeConfig.getBasePath());
+ if (metaClient.isTimelineNonEmpty()) {
+ LOG.info("Skip overwriting record size estimate due to timeline is
non-empty.");
+ return Option.empty();
+ }
+ try {
+ String instantTime =
getInstantFromTemporalAccessor(Instant.now().atZone(ZoneId.systemDefault()));
+ Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig,
instantTime);
+ if (result.getLeft()) {
+ long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+ LOG.info("Overwriting record size estimate to " + avgSize);
+ TypedProperties props = writeConfig.getProps();
+ props.put(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
String.valueOf(avgSize));
+ return
Option.of(HoodieWriteConfig.newBuilder().withProperties(props).build());
+ }
+ } catch (IOException e) {
+ LOG.error(String.format("Not overwriting record size estimate for table
%s due to error when doing sample writes.", writeConfig.getTableName()), e);
+ }
+ return Option.empty();
+ }
+
+ private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc,
JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String
instantTime)
+ throws IOException {
+ final String sampleWritesBasePath = getSampleWritesBasePath(jsc,
writeConfig, instantTime);
+ HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(HoodieTableType.COPY_ON_WRITE)
+ .setTableName(String.format("%s_samples_%s",
writeConfig.getTableName(), instantTime))
+ .setCDCEnabled(false)
+ .initTable(jsc.hadoopConfiguration(), sampleWritesBasePath);
+ TypedProperties props = writeConfig.getProps();
+ props.put(SAMPLE_WRITES_ENABLED.key(), "false");
+ final HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder()
+ .withProps(props)
+ .withTableServicesEnabled(false)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+ .withSchemaEvolutionEnable(false)
+ .withBulkInsertParallelism(1)
+ .withAutoCommit(true)
+ .withPath(sampleWritesBasePath)
+ .build();
+ try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new
HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {
+ int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE);
+ List<HoodieRecord> samples = records.coalesce(1).take(size);
+ sampleWriteClient.startCommitWithTime(instantTime);
+ JavaRDD<WriteStatus> writeStatusRDD =
sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime);
+ if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) {
+ LOG.error(String.format("sample writes for table %s failed with
errors.", writeConfig.getTableName()));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Printing out the top 100 errors");
+ writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws
-> {
+ LOG.trace("Global error :", ws.getGlobalError());
+ ws.getErrors().forEach((key, throwable) ->
+ LOG.trace(String.format("Error for key: %s", key), throwable));
+ });
+ }
+ return Pair.of(false, null);
+ } else {
+ return Pair.of(true, sampleWritesBasePath);
+ }
+ }
+ }
+
+ private static String getSampleWritesBasePath(JavaSparkContext jsc,
HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+ Path basePath = new CachingPath(writeConfig.getBasePath(),
SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime);
+ FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
+ if (fs.exists(basePath)) {
+ fs.delete(basePath, true);
+ }
+ return basePath.toString();
+ }
+
+ private static long getAvgSizeFromSampleWrites(JavaSparkContext jsc, String
sampleWritesBasePath) throws IOException {
+ HoodieTableMetaClient metaClient = getMetaClient(jsc,
sampleWritesBasePath);
+ Option<HoodieInstant> lastInstantOpt =
metaClient.getCommitTimeline().filterCompletedInstants().lastInstant();
+ checkState(lastInstantOpt.isPresent(), "The only completed instant should
be present in sample_writes table.");
+ HoodieInstant instant = lastInstantOpt.get();
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+
.fromBytes(metaClient.getCommitTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
+ long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
+ long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
+ return (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
+ }
+
+ private static HoodieTableMetaClient getMetaClient(JavaSparkContext jsc,
String basePath) {
+ FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
+ return
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java
new file mode 100644
index 00000000000..3a7516aab24
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkSampleWritesUtils extends
SparkClientFunctionalTestHarness {
+
+ private HoodieTestDataGenerator dataGen;
+ private HoodieTableMetaClient metaClient;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ dataGen = new HoodieTestDataGenerator(0xDEED);
+ metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ dataGen.close();
+ }
+
+ /*
+ * TODO remove this and fix parent class (HUDI-6042)
+ */
+ @Override
+ public String basePath() {
+ return tempDir.toAbsolutePath().toString();
+ }
+
+ @Test
+ public void skipOverwriteRecordSizeEstimateWhenTimelineNonEmpty() throws
Exception {
+ String commitTime = HoodieTestTable.makeNewCommitTime();
+ HoodieTestTable.of(metaClient).addCommit(commitTime);
+ int originalRecordSize = 100;
+ TypedProperties props = new TypedProperties();
+ props.put(HoodieDeltaStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "true");
+ props.put(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
String.valueOf(originalRecordSize));
+ HoodieWriteConfig originalWriteConfig = HoodieWriteConfig.newBuilder()
+ .withProperties(props)
+ .withPath(basePath())
+ .build();
+ JavaRDD<HoodieRecord> records =
jsc().parallelize(dataGen.generateInserts(commitTime, 1), 1);
+ Option<HoodieWriteConfig> writeConfigOpt =
SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), records,
originalWriteConfig);
+ assertFalse(writeConfigOpt.isPresent());
+ assertEquals(originalRecordSize,
originalWriteConfig.getCopyOnWriteRecordSizeEstimate(), "Original record size
estimate should not be changed.");
+ }
+
+ @Test
+ public void overwriteRecordSizeEstimateForEmptyTable() {
+ int originalRecordSize = 100;
+ TypedProperties props = new TypedProperties();
+ props.put(HoodieDeltaStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "true");
+ props.put(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
String.valueOf(originalRecordSize));
+ HoodieWriteConfig originalWriteConfig = HoodieWriteConfig.newBuilder()
+ .withProperties(props)
+ .forTable("foo")
+ .withPath(basePath())
+ .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .build();
+
+ String commitTime = HoodieTestDataGenerator.getCommitTimeAtUTC(1);
+ JavaRDD<HoodieRecord> records =
jsc().parallelize(dataGen.generateInserts(commitTime, 2000), 2);
+ Option<HoodieWriteConfig> writeConfigOpt =
SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), records,
originalWriteConfig);
+ assertTrue(writeConfigOpt.isPresent());
+ assertEquals(779.0,
writeConfigOpt.get().getCopyOnWriteRecordSizeEstimate(), 10.0);
+ }
+}