This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 40806a67a6 Spark: Minor fixes for CreateChangelogViewProcedure (#7072)
40806a67a6 is described below
commit 40806a67a673d0d8f33e31b054c76f1e590093a1
Author: Yufei Gu <[email protected]>
AuthorDate: Fri Mar 10 15:29:38 2023 -0800
Spark: Minor fixes for CreateChangelogViewProcedure (#7072)
---
.../iceberg/spark/procedures/BaseProcedure.java | 3 +--
.../procedures/CreateChangelogViewProcedure.java | 25 +++++++++++-----------
.../iceberg/spark/procedures/BaseProcedure.java | 3 +--
.../procedures/CreateChangelogViewProcedure.java | 25 +++++++++++-----------
4 files changed, 26 insertions(+), 30 deletions(-)
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
index 1babe4efae..ed0156adc5 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
@@ -148,9 +148,8 @@ abstract class BaseProcedure implements Procedure {
}
}
- protected Dataset<Row> loadDataSetFromTable(Identifier tableIdent,
Map<String, String> options) {
+ protected Dataset<Row> loadRows(Identifier tableIdent, Map<String, String>
options) {
String tableName = Spark3Util.quotedFullIdentifier(tableCatalog().name(),
tableIdent);
- // no need to validate the read options here since the reader will
validate them
return spark().read().options(options).table(tableName);
}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
index ab844e08e9..f5bc1310ba 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -134,7 +134,7 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
// load insert and deletes from the changelog table
Identifier changelogTableIdent = changelogTableIdent(tableIdent);
- Dataset<Row> df = loadDataSetFromTable(changelogTableIdent, options(args));
+ Dataset<Row> df = loadRows(changelogTableIdent, options(args));
if (shouldComputeUpdateImages(args)) {
df = computeUpdateImages(identifierColumns(args, tableIdent), df);
@@ -152,16 +152,15 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
private Dataset<Row> computeUpdateImages(String[] identifierColumns,
Dataset<Row> df) {
Preconditions.checkArgument(
identifierColumns.length > 0,
- "Cannot compute the update-rows because identifier columns are not
set");
+ "Cannot compute the update images because identifier columns are not
set");
- Column[] repartitionColumns = new Column[identifierColumns.length + 1];
+ Column[] repartitionSpec = new Column[identifierColumns.length + 1];
for (int i = 0; i < identifierColumns.length; i++) {
- repartitionColumns[i] = df.col(identifierColumns[i]);
+ repartitionSpec[i] = df.col(identifierColumns[i]);
}
- repartitionColumns[repartitionColumns.length - 1] =
- df.col(MetadataColumns.CHANGE_ORDINAL.name());
+ repartitionSpec[repartitionSpec.length - 1] =
df.col(MetadataColumns.CHANGE_ORDINAL.name());
- return applyChangelogIterator(df, repartitionColumns);
+ return applyChangelogIterator(df, repartitionSpec);
}
private boolean shouldComputeUpdateImages(InternalRow args) {
@@ -182,12 +181,12 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
}
private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
- Column[] repartitionColumns =
+ Column[] repartitionSpec =
Arrays.stream(df.columns())
.filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
.map(df::col)
.toArray(Column[]::new);
- return applyChangelogIterator(df, repartitionColumns);
+ return applyChangelogIterator(df, repartitionSpec);
}
private String[] identifierColumns(InternalRow args, Identifier tableIdent) {
@@ -233,13 +232,13 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
}
}
- private Dataset<Row> applyChangelogIterator(Dataset<Row> df, Column[]
repartitionColumns) {
- Column[] sortSpec = sortSpec(df, repartitionColumns);
+ private Dataset<Row> applyChangelogIterator(Dataset<Row> df, Column[]
repartitionSpec) {
+ Column[] sortSpec = sortSpec(df, repartitionSpec);
StructType schema = df.schema();
String[] identifierFields =
-
Arrays.stream(repartitionColumns).map(Column::toString).toArray(String[]::new);
+
Arrays.stream(repartitionSpec).map(Column::toString).toArray(String[]::new);
- return df.repartition(repartitionColumns)
+ return df.repartition(repartitionSpec)
.sortWithinPartitions(sortSpec)
.mapPartitions(
(MapPartitionsFunction<Row, Row>)
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
index 1babe4efae..ed0156adc5 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
@@ -148,9 +148,8 @@ abstract class BaseProcedure implements Procedure {
}
}
- protected Dataset<Row> loadDataSetFromTable(Identifier tableIdent,
Map<String, String> options) {
+ protected Dataset<Row> loadRows(Identifier tableIdent, Map<String, String>
options) {
String tableName = Spark3Util.quotedFullIdentifier(tableCatalog().name(),
tableIdent);
- // no need to validate the read options here since the reader will
validate them
return spark().read().options(options).table(tableName);
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
index b47cc0de0b..3234b7f714 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -140,7 +140,7 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
// load insert and deletes from the changelog table
Identifier changelogTableIdent = changelogTableIdent(tableIdent);
- Dataset<Row> df = loadDataSetFromTable(changelogTableIdent,
options(input));
+ Dataset<Row> df = loadRows(changelogTableIdent, options(input));
if (shouldComputeUpdateImages(input)) {
df = computeUpdateImages(identifierColumns(input, tableIdent), df);
@@ -158,16 +158,15 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
private Dataset<Row> computeUpdateImages(String[] identifierColumns,
Dataset<Row> df) {
Preconditions.checkArgument(
identifierColumns.length > 0,
- "Cannot compute the update-rows because identifier columns are not
set");
+ "Cannot compute the update images because identifier columns are not
set");
- Column[] repartitionColumns = new Column[identifierColumns.length + 1];
+ Column[] repartitionSpec = new Column[identifierColumns.length + 1];
for (int i = 0; i < identifierColumns.length; i++) {
- repartitionColumns[i] = df.col(identifierColumns[i]);
+ repartitionSpec[i] = df.col(identifierColumns[i]);
}
- repartitionColumns[repartitionColumns.length - 1] =
- df.col(MetadataColumns.CHANGE_ORDINAL.name());
+ repartitionSpec[repartitionSpec.length - 1] =
df.col(MetadataColumns.CHANGE_ORDINAL.name());
- return applyChangelogIterator(df, repartitionColumns);
+ return applyChangelogIterator(df, repartitionSpec);
}
private boolean shouldComputeUpdateImages(ProcedureInput input) {
@@ -181,12 +180,12 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
}
private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
- Column[] repartitionColumns =
+ Column[] repartitionSpec =
Arrays.stream(df.columns())
.filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
.map(df::col)
.toArray(Column[]::new);
- return applyChangelogIterator(df, repartitionColumns);
+ return applyChangelogIterator(df, repartitionSpec);
}
private String[] identifierColumns(ProcedureInput input, Identifier
tableIdent) {
@@ -214,13 +213,13 @@ public class CreateChangelogViewProcedure extends
BaseProcedure {
return input.string(CHANGELOG_VIEW_PARAM, defaultValue);
}
- private Dataset<Row> applyChangelogIterator(Dataset<Row> df, Column[]
repartitionColumns) {
- Column[] sortSpec = sortSpec(df, repartitionColumns);
+ private Dataset<Row> applyChangelogIterator(Dataset<Row> df, Column[]
repartitionSpec) {
+ Column[] sortSpec = sortSpec(df, repartitionSpec);
StructType schema = df.schema();
String[] identifierFields =
-
Arrays.stream(repartitionColumns).map(Column::toString).toArray(String[]::new);
+
Arrays.stream(repartitionSpec).map(Column::toString).toArray(String[]::new);
- return df.repartition(repartitionColumns)
+ return df.repartition(repartitionSpec)
.sortWithinPartitions(sortSpec)
.mapPartitions(
(MapPartitionsFunction<Row, Row>)