alexeykudinkin commented on code in PR #6046:
URL: https://github.com/apache/hudi/pull/6046#discussion_r974679847
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -105,6 +108,52 @@ object HoodieDatasetBulkInsertHelper extends Logging {
partitioner.repartitionRecords(trimmedDF,
config.getBulkInsertShuffleParallelism)
}
+ /**
+ * Perform bulk insert for [[Dataset<Row>]], will not change timeline/index,
return
+ * information about write files.
+ */
+ def bulkInsert(dataset: Dataset[Row],
+ instantTime: String,
+ table: HoodieTable[_ <: HoodieRecordPayload[_ <:
HoodieRecordPayload[_ <: AnyRef]], _, _, _],
+ writeConfig: HoodieWriteConfig,
+ partitioner: BulkInsertPartitioner[Dataset[Row]],
+ parallelism: Int,
+ shouldPreserveHoodieMetadata: Boolean):
HoodieData[WriteStatus] = {
+ val repartitionedDataset = partitioner.repartitionRecords(dataset,
parallelism)
+ val arePartitionRecordsSorted = partitioner.arePartitionRecordsSorted
+ val schema = dataset.schema
+ val writeStatuses =
repartitionedDataset.queryExecution.toRdd.mapPartitions(iter => {
+ val taskContextSupplier: TaskContextSupplier =
table.getTaskContextSupplier
+ val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
+ val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
+ val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get
+ val writer = new BulkInsertDataInternalWriterHelper(
+ table,
+ writeConfig,
+ instantTime,
+ taskPartitionId,
+ taskId,
+ taskEpochId,
+ schema,
+ writeConfig.populateMetaFields,
+ arePartitionRecordsSorted,
+ shouldPreserveHoodieMetadata)
+
+ try {
+ iter.foreach(writer.write)
+ } catch {
+ case t: Throwable =>
+ writer.abort()
+ throw t
+ } finally {
+ writer.close()
+ }
+
+ writer.getWriteStatuses.asScala.map(_.toWriteStatus).iterator
+ }).collect()
+ table.getContext.parallelize(writeStatuses.toList.asJava)
Review Comment:
nit: no need for `toList`
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkAdapterSupport.scala:
##########
@@ -26,17 +26,6 @@ import org.apache.spark.sql.hudi.SparkAdapter
*/
trait SparkAdapterSupport {
- lazy val sparkAdapter: SparkAdapter = {
Review Comment:
Instead of moving this to Java let's dot he following:
- Create companion object `ScalaAdapterSupport`
- Move this conditional there
- Keep this var (for compatibility) referencing static one from the object
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java:
##########
@@ -183,8 +183,17 @@ public boolean accept(Path path) {
metaClientCache.put(baseDir.toString(), metaClient);
}
- fsView =
FileSystemViewManager.createInMemoryFileSystemView(engineContext,
- metaClient,
HoodieInputFormatUtils.buildMetadataConfig(getConf()));
+ if (getConf().get("as.of.instant") != null) {
Review Comment:
Good catch!
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java:
##########
@@ -91,27 +81,4 @@ public JavaRDD<HoodieRecord<T>>
repartitionRecords(JavaRDD<HoodieRecord<T>> reco
return hoodieRecord;
});
}
-
- private Dataset<Row> reorder(Dataset<Row> dataset, int numOutputGroups) {
Review Comment:
Thanks for cleaning that up!
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java:
##########
@@ -60,20 +61,30 @@ class TestHoodieSparkMergeOnReadTableClustering extends
SparkClientFunctionalTes
private static Stream<Arguments> testClustering() {
return Stream.of(
- Arguments.of(true, true, true),
- Arguments.of(true, true, false),
- Arguments.of(true, false, true),
- Arguments.of(true, false, false),
- Arguments.of(false, true, true),
- Arguments.of(false, true, false),
- Arguments.of(false, false, true),
- Arguments.of(false, false, false)
- );
+ Arrays.asList(true, true, true),
+ Arrays.asList(true, true, false),
+ Arrays.asList(true, false, true),
+ Arrays.asList(true, false, false),
+ Arrays.asList(false, true, true),
+ Arrays.asList(false, true, false),
+ Arrays.asList(false, false, true),
+ Arrays.asList(false, false, false))
+ .flatMap(arguments -> {
+ ArrayList<Boolean> enableRowClusteringArgs = new ArrayList<>();
+ enableRowClusteringArgs.add(true);
+ enableRowClusteringArgs.addAll(arguments);
+ ArrayList<Boolean> disableRowClusteringArgs = new ArrayList<>();
+ disableRowClusteringArgs.add(false);
Review Comment:
Appreciate your intent, but if we'd be chaining every parameter like that
this could would become unreadable. Let's just add one more column for
parameter (and add a comment at the top enumerating all the params in this
matrix)
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -110,44 +110,9 @@ class DefaultSource extends RelationProvider
val isBootstrappedTable =
metaClient.getTableConfig.getBootstrapBasePath.isPresent
Review Comment:
Let's move these statements (up to line #109) into
`DefaultSource.createRelation` to make a cleaner cut (all these vars are used
in that method rather than here)
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -147,6 +150,15 @@ trait SparkAdapter extends Serializable {
*/
def createInterpretedPredicate(e: Expression): InterpretedPredicate
+ /**
+ * Create Hoodie relation based on globPaths, otherwise use tablePath if
it's empty
+ */
+ def createRelation(metaClient: HoodieTableMetaClient,
+ sqlContext: SQLContext,
Review Comment:
nit: Rule of thumb is to put anything context-like as first param
(`SQLContext`)
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/JavaSparkAdaptorSupport.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.spark.sql.hudi.SparkAdapter;
+
+/**
+ * Java implementation to provide SparkAdapter when we need to adapt
+ * the difference between spark2 and spark3.
+ */
+public class JavaSparkAdaptorSupport {
+
+ private JavaSparkAdaptorSupport() {}
+
+ private static class AdapterSupport {
+
+ private static final SparkAdapter ADAPTER = new
AdapterSupport().sparkAdapter();
+
+ private SparkAdapter sparkAdapter() {
+ String adapterClass;
+ if (HoodieSparkUtils.isSpark3_3()) {
Review Comment:
We should keep this one in Scala though -- see my comment below
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -160,9 +160,6 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
fileFormat = fileFormat,
optParams)(sparkSession)
} else {
- val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
Review Comment:
This will be globbed w/in DataSource:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L569
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]