aokolnychyi commented on a change in pull request #3461:
URL: https://github.com/apache/iceberg/pull/3461#discussion_r742215843
##########
File path: core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
##########
@@ -44,7 +44,11 @@ public static SortOrder buildSortOrder(Table table) {
return buildSortOrder(table.schema(), table.spec(), table.sortOrder());
}
- public static SortOrder buildSortOrder(Schema schema, PartitionSpec spec,
SortOrder sortOrder) {
+ public static SortOrder buildSortOrder(Table table, SortOrder sortOrder) {
Review comment:
I had to introduce this method to stay on a single line in a few places.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -302,74 +293,39 @@ public Transform unknown(int fieldId, String sourceName,
int sourceId, String tr
return
transforms.stream().filter(Objects::nonNull).toArray(Transform[]::new);
}
- public static Distribution
buildRequiredDistribution(org.apache.iceberg.Table table) {
Review comment:
I moved the distribution and order specific logic into a separate class
as we will also handle copy-on-write and merge-on-read scenarios.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
##########
@@ -207,12 +206,7 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap
options) {
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
- if
(info.options().containsKey(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID))
{
Review comment:
I removed `SparkRewriteBuilder` and added its logic to
`SparkWriteBuilder`. They are not that different and share most of the code.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3BinPackStrategy.java
##########
@@ -72,6 +72,7 @@ public Table table() {
.format("iceberg")
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
.option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+ .option(SparkWriteOptions.DISTRIBUTION_MODE, "none")
Review comment:
Still request a local sort for bin-packing based on the defined table
sort order.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -137,7 +137,9 @@ public RewriteStrategy options(Map<String, String> options)
{
sortedDf.write()
.format("iceberg")
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
- .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+ .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
Review comment:
Switched to `SparkWriteOptions` instead of `RewriteDataFiles` here. I
think it was a typo.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java
##########
@@ -137,7 +137,9 @@ public RewriteStrategy options(Map<String, String> options)
{
sortedDf.write()
.format("iceberg")
.option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID)
- .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+ .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
Review comment:
Switched to `SparkWriteOptions` instead of `RewriteDataFiles` here. I
think it was a typo.
The underlying values are same so we were fine before too.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -64,13 +75,18 @@
this.writeInfo = info;
this.dsSchema = info.schema();
this.overwriteMode = writeConf.overwriteMode();
+ this.rewrittenFileSetId = writeConf.rewrittenFileSetId();
this.handleTimestampWithoutZone = writeConf.handleTimestampWithoutZone();
+ this.requestDistributionAndOrdering = allIdentityTransforms(table.spec());
Review comment:
I'll add a check if extensions are enabled later. The plan is to have a
rule that would build Iceberg writes and translate all connector transforms to
Iceberg transforms.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -64,13 +75,18 @@
this.writeInfo = info;
this.dsSchema = info.schema();
this.overwriteMode = writeConf.overwriteMode();
+ this.rewrittenFileSetId = writeConf.rewrittenFileSetId();
this.handleTimestampWithoutZone = writeConf.handleTimestampWithoutZone();
+ this.requestDistributionAndOrdering = allIdentityTransforms(table.spec());
Review comment:
I'll add a check if extensions are enabled later. The plan is to have a
rule that would build Iceberg writes and translate all connector transforms to
Iceberg transforms. That can only be done if Iceberg extensions are enabled. If
not, we can only request a distribution and ordering if we have identity
transforms.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -117,6 +124,18 @@
this.dsSchema = dsSchema;
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
+ this.requiredDistribution = requiredDistribution;
+ this.requiredOrdering = requiredOrdering;
+ }
+
+ @Override
Review comment:
I don't let users to request a fixed number of shuffle partitions. I
hope AQE would be smart enough and will make a better choice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]