This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e04dc0951cf [HUDI-5315] Use sample writes to estimate record size 
(#8390)
e04dc0951cf is described below

commit e04dc0951cf21122f0d3dd4f673b87b663253109
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu May 4 19:05:13 2023 +0800

    [HUDI-5315] Use sample writes to estimate record size (#8390)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   3 +-
 .../hudi/common/table/HoodieTableMetaClient.java   |   1 +
 .../config/HoodieDeltaStreamerConfig.java          |  13 ++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  37 ++---
 .../deltastreamer/SparkSampleWritesUtils.java      | 159 +++++++++++++++++++++
 .../deltastreamer/TestSparkSampleWritesUtils.java  | 106 ++++++++++++++
 6 files changed, 302 insertions(+), 17 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 50ba109c0de..b38ceeeea98 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -102,6 +102,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;
+import static 
org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
 import static 
org.apache.hudi.table.marker.ConflictDetectionUtils.getDefaultEarlyConflictDetectionStrategy;
 
 /**
@@ -1453,7 +1454,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public int getCopyOnWriteRecordSizeEstimate() {
-    return getInt(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE);
+    return getInt(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE);
   }
 
   public boolean allowMultipleCleans() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 7d248dc182b..87bdd09db10 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -86,6 +86,7 @@ public class HoodieTableMetaClient implements Serializable {
   public static final String TEMPFOLDER_NAME = METAFOLDER_NAME + 
Path.SEPARATOR + ".temp";
   public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + 
Path.SEPARATOR + ".aux";
   public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = 
AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap";
+  public static final String SAMPLE_WRITES_FOLDER_PATH = AUXILIARYFOLDER_NAME 
+ Path.SEPARATOR + ".sample_writes";
   public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + 
Path.SEPARATOR + ".heartbeat";
   public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME + 
Path.SEPARATOR + "metadata";
   public static final String HASHING_METADATA_FOLDER_NAME = ".bucket_index" + 
Path.SEPARATOR + "consistent_hashing_metadata";
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java
index 86d154b50ed..2401a2bf3b1 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java
@@ -26,6 +26,8 @@ import org.apache.hudi.common.config.HoodieConfig;
 
 import javax.annotation.concurrent.Immutable;
 
+import static 
org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+
 /**
  * Delta Streamer related config.
  */
@@ -93,4 +95,15 @@ public class HoodieDeltaStreamerConfig extends HoodieConfig {
       .withDocumentation("The path to which a particular table is ingested. 
The config is specific to HoodieMultiTableDeltaStreamer"
           + " and overrides path determined using option `--base-path-prefix` 
for a table. This config is ignored for a single"
           + " table deltastreamer");
+  public static final ConfigProperty<Boolean> SAMPLE_WRITES_ENABLED = 
ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "sample.writes.enabled")
+      .defaultValue(false)
+      .withDocumentation("Set this to true to sample from the first batch of 
records and write to the auxiliary path, before writing to the table."
+          + "The sampled records are used to calculate the average record 
size. The relevant write client will have `" + 
COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key()
+          + "` being overwritten by the calculated result.");
+  public static final ConfigProperty<Integer> SAMPLE_WRITES_SIZE = 
ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "sample.writes.size")
+      .defaultValue(5000)
+      .withDocumentation("Number of records to sample from the first write. To 
improve the estimation's accuracy, "
+          + "for smaller or more compressable record size, set the sample size 
bigger. For bigger or less compressable record size, set smaller.");
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 8e2b03c7849..daae0a385c6 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -148,10 +148,10 @@ import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
 import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
 import static 
org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
+import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_FORCE_SKIP_PROP;
 import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
-import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DEFAULT_CHECKPOINT_FORCE_SKIP_PROP;
 import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
-import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_FORCE_SKIP_PROP;
+import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DEFAULT_CHECKPOINT_FORCE_SKIP_PROP;
 import static 
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
 import static 
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
 
@@ -398,13 +398,14 @@ public class DeltaSync implements Serializable, Closeable 
{
 
     Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
srcRecordsWithCkpt = readFromSource(commitsTimelineOpt);
 
-    if (null != srcRecordsWithCkpt) {
+    if (srcRecordsWithCkpt != null) {
+      final JavaRDD<HoodieRecord> recordsFromSource = 
srcRecordsWithCkpt.getRight().getRight();
       // this is the first input batch. If schemaProvider not set, use it and 
register Avro Schema and start
       // compactor
-      if (null == writeClient) {
+      if (writeClient == null) {
         this.schemaProvider = srcRecordsWithCkpt.getKey();
         // Setup HoodieWriteClient and compaction now that we decided on schema
-        setupWriteClient();
+        setupWriteClient(recordsFromSource);
       } else {
         Schema newSourceSchema = srcRecordsWithCkpt.getKey().getSourceSchema();
         Schema newTargetSchema = srcRecordsWithCkpt.getKey().getTargetSchema();
@@ -413,7 +414,7 @@ public class DeltaSync implements Serializable, Closeable {
           LOG.info("Seeing new schema. Source :" + 
newSourceSchema.toString(true)
               + ", Target :" + newTargetSchema.toString(true));
           // We need to recreate write client with new schema and register 
them.
-          reInitWriteClient(newSourceSchema, newTargetSchema);
+          reInitWriteClient(newSourceSchema, newTargetSchema, 
recordsFromSource);
           processedSchema.addSchema(newSourceSchema);
           processedSchema.addSchema(newTargetSchema);
         }
@@ -427,7 +428,7 @@ public class DeltaSync implements Serializable, Closeable {
         }
       }
 
-      result = writeToSink(srcRecordsWithCkpt.getRight().getRight(),
+      result = writeToSink(recordsFromSource,
           srcRecordsWithCkpt.getRight().getLeft(), metrics, 
overallTimerContext);
     }
 
@@ -944,34 +945,38 @@ public class DeltaSync implements Serializable, Closeable 
{
    * SchemaProvider creation is a precursor to HoodieWriteClient and 
AsyncCompactor creation. This method takes care of
    * this constraint.
    */
-  public void setupWriteClient() throws IOException {
+  private void setupWriteClient(JavaRDD<HoodieRecord> records) throws 
IOException {
     if ((null != schemaProvider)) {
       Schema sourceSchema = schemaProvider.getSourceSchema();
       Schema targetSchema = schemaProvider.getTargetSchema();
-      reInitWriteClient(sourceSchema, targetSchema);
+      reInitWriteClient(sourceSchema, targetSchema, records);
     }
   }
 
-  private void reInitWriteClient(Schema sourceSchema, Schema targetSchema) 
throws IOException {
+  private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, 
JavaRDD<HoodieRecord> records) throws IOException {
     LOG.info("Setting up new Hoodie Write Client");
     if (isDropPartitionColumns()) {
       targetSchema = HoodieAvroUtils.removeFields(targetSchema, 
getPartitionColumns(keyGenerator, props));
     }
     registerAvroSchemas(sourceSchema, targetSchema);
-    HoodieWriteConfig hoodieCfg = getHoodieClientConfig(targetSchema);
-    if (hoodieCfg.isEmbeddedTimelineServerEnabled()) {
+    final HoodieWriteConfig initialWriteConfig = 
getHoodieClientConfig(targetSchema);
+    final HoodieWriteConfig writeConfig = SparkSampleWritesUtils
+        .getWriteConfigWithRecordSizeEstimate(jssc, records, 
initialWriteConfig)
+        .orElse(initialWriteConfig);
+
+    if (writeConfig.isEmbeddedTimelineServerEnabled()) {
       if (!embeddedTimelineService.isPresent()) {
-        embeddedTimelineService = 
EmbeddedTimelineServerHelper.createEmbeddedTimelineService(new 
HoodieSparkEngineContext(jssc), hoodieCfg);
+        embeddedTimelineService = 
EmbeddedTimelineServerHelper.createEmbeddedTimelineService(new 
HoodieSparkEngineContext(jssc), writeConfig);
       } else {
-        
EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(embeddedTimelineService.get(),
 hoodieCfg);
+        
EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(embeddedTimelineService.get(),
 writeConfig);
       }
     }
 
-    if (null != writeClient) {
+    if (writeClient != null) {
       // Close Write client.
       writeClient.close();
     }
-    writeClient = new SparkRDDWriteClient<>(new 
HoodieSparkEngineContext(jssc), hoodieCfg, embeddedTimelineService);
+    writeClient = new SparkRDDWriteClient<>(new 
HoodieSparkEngineContext(jssc), writeConfig, embeddedTimelineService);
     onInitializingHoodieWriteClient.apply(writeClient);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SparkSampleWritesUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SparkSampleWritesUtils.java
new file mode 100644
index 00000000000..533a9680303
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SparkSampleWritesUtils.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.CachingPath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.List;
+
+import static 
org.apache.hudi.common.table.HoodieTableMetaClient.SAMPLE_WRITES_FOLDER_PATH;
+import static 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.getInstantFromTemporalAccessor;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static 
org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SAMPLE_WRITES_ENABLED;
+import static 
org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SAMPLE_WRITES_SIZE;
+
+/**
+ * The utilities class is dedicated to estimating average record size by 
writing sample incoming records
+ * to `.hoodie/.aux/.sample_writes/<instant time>/<epoch millis>` and reading 
the commit metadata.
+ * <p>
+ * TODO handle sample_writes sub-path clean-up w.r.t. rollback and insert 
overwrite. (HUDI-6044)
+ */
+public class SparkSampleWritesUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkSampleWritesUtils.class);
+
+  public static Option<HoodieWriteConfig> 
getWriteConfigWithRecordSizeEstimate(JavaSparkContext jsc, 
JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig) {
+    if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
+      LOG.debug("Skip overwriting record size estimate as it's disabled.");
+      return Option.empty();
+    }
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, 
writeConfig.getBasePath());
+    if (metaClient.isTimelineNonEmpty()) {
+      LOG.info("Skip overwriting record size estimate due to timeline is 
non-empty.");
+      return Option.empty();
+    }
+    try {
+      String instantTime = 
getInstantFromTemporalAccessor(Instant.now().atZone(ZoneId.systemDefault()));
+      Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, 
instantTime);
+      if (result.getLeft()) {
+        long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
+        LOG.info("Overwriting record size estimate to " + avgSize);
+        TypedProperties props = writeConfig.getProps();
+        props.put(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), 
String.valueOf(avgSize));
+        return 
Option.of(HoodieWriteConfig.newBuilder().withProperties(props).build());
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Not overwriting record size estimate for table 
%s due to error when doing sample writes.", writeConfig.getTableName()), e);
+    }
+    return Option.empty();
+  }
+
+  private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, 
JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String 
instantTime)
+      throws IOException {
+    final String sampleWritesBasePath = getSampleWritesBasePath(jsc, 
writeConfig, instantTime);
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(String.format("%s_samples_%s", 
writeConfig.getTableName(), instantTime))
+        .setCDCEnabled(false)
+        .initTable(jsc.hadoopConfiguration(), sampleWritesBasePath);
+    TypedProperties props = writeConfig.getProps();
+    props.put(SAMPLE_WRITES_ENABLED.key(), "false");
+    final HoodieWriteConfig sampleWriteConfig = HoodieWriteConfig.newBuilder()
+        .withProps(props)
+        .withTableServicesEnabled(false)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withSchemaEvolutionEnable(false)
+        .withBulkInsertParallelism(1)
+        .withAutoCommit(true)
+        .withPath(sampleWritesBasePath)
+        .build();
+    try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new 
HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {
+      int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE);
+      List<HoodieRecord> samples = records.coalesce(1).take(size);
+      sampleWriteClient.startCommitWithTime(instantTime);
+      JavaRDD<WriteStatus> writeStatusRDD = 
sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime);
+      if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) {
+        LOG.error(String.format("sample writes for table %s failed with 
errors.", writeConfig.getTableName()));
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Printing out the top 100 errors");
+          writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws 
-> {
+            LOG.trace("Global error :", ws.getGlobalError());
+            ws.getErrors().forEach((key, throwable) ->
+                LOG.trace(String.format("Error for key: %s", key), throwable));
+          });
+        }
+        return Pair.of(false, null);
+      } else {
+        return Pair.of(true, sampleWritesBasePath);
+      }
+    }
+  }
+
+  private static String getSampleWritesBasePath(JavaSparkContext jsc, 
HoodieWriteConfig writeConfig, String instantTime) throws IOException {
+    Path basePath = new CachingPath(writeConfig.getBasePath(), 
SAMPLE_WRITES_FOLDER_PATH + Path.SEPARATOR + instantTime);
+    FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
+    if (fs.exists(basePath)) {
+      fs.delete(basePath, true);
+    }
+    return basePath.toString();
+  }
+
+  private static long getAvgSizeFromSampleWrites(JavaSparkContext jsc, String 
sampleWritesBasePath) throws IOException {
+    HoodieTableMetaClient metaClient = getMetaClient(jsc, 
sampleWritesBasePath);
+    Option<HoodieInstant> lastInstantOpt = 
metaClient.getCommitTimeline().filterCompletedInstants().lastInstant();
+    checkState(lastInstantOpt.isPresent(), "The only completed instant should 
be present in sample_writes table.");
+    HoodieInstant instant = lastInstantOpt.get();
+    HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+        
.fromBytes(metaClient.getCommitTimeline().getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
+    long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
+    long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
+    return (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
+  }
+
+  private static HoodieTableMetaClient getMetaClient(JavaSparkContext jsc, 
String basePath) {
+    FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
+    return 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java
new file mode 100644
index 00000000000..3a7516aab24
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSparkSampleWritesUtils extends 
SparkClientFunctionalTestHarness {
+
+  private HoodieTestDataGenerator dataGen;
+  private HoodieTableMetaClient metaClient;
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    dataGen = new HoodieTestDataGenerator(0xDEED);
+    metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+  }
+
+  @AfterEach
+  public void tearDown() {
+    dataGen.close();
+  }
+
+  /*
+   * TODO remove this and fix parent class (HUDI-6042)
+   */
+  @Override
+  public String basePath() {
+    return tempDir.toAbsolutePath().toString();
+  }
+
+  @Test
+  public void skipOverwriteRecordSizeEstimateWhenTimelineNonEmpty() throws 
Exception {
+    String commitTime = HoodieTestTable.makeNewCommitTime();
+    HoodieTestTable.of(metaClient).addCommit(commitTime);
+    int originalRecordSize = 100;
+    TypedProperties props = new TypedProperties();
+    props.put(HoodieDeltaStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "true");
+    props.put(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), 
String.valueOf(originalRecordSize));
+    HoodieWriteConfig originalWriteConfig = HoodieWriteConfig.newBuilder()
+        .withProperties(props)
+        .withPath(basePath())
+        .build();
+    JavaRDD<HoodieRecord> records = 
jsc().parallelize(dataGen.generateInserts(commitTime, 1), 1);
+    Option<HoodieWriteConfig> writeConfigOpt = 
SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), records, 
originalWriteConfig);
+    assertFalse(writeConfigOpt.isPresent());
+    assertEquals(originalRecordSize, 
originalWriteConfig.getCopyOnWriteRecordSizeEstimate(), "Original record size 
estimate should not be changed.");
+  }
+
+  @Test
+  public void overwriteRecordSizeEstimateForEmptyTable() {
+    int originalRecordSize = 100;
+    TypedProperties props = new TypedProperties();
+    props.put(HoodieDeltaStreamerConfig.SAMPLE_WRITES_ENABLED.key(), "true");
+    props.put(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), 
String.valueOf(originalRecordSize));
+    HoodieWriteConfig originalWriteConfig = HoodieWriteConfig.newBuilder()
+        .withProperties(props)
+        .forTable("foo")
+        .withPath(basePath())
+        .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+        .build();
+
+    String commitTime = HoodieTestDataGenerator.getCommitTimeAtUTC(1);
+    JavaRDD<HoodieRecord> records = 
jsc().parallelize(dataGen.generateInserts(commitTime, 2000), 2);
+    Option<HoodieWriteConfig> writeConfigOpt = 
SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), records, 
originalWriteConfig);
+    assertTrue(writeConfigOpt.isPresent());
+    assertEquals(779.0, 
writeConfigOpt.get().getCopyOnWriteRecordSizeEstimate(), 10.0);
+  }
+}

Reply via email to