This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 40806a67a6 Spark: Minor fixes for CreateChangelogViewProcedure (#7072)
40806a67a6 is described below

commit 40806a67a673d0d8f33e31b054c76f1e590093a1
Author: Yufei Gu <[email protected]>
AuthorDate: Fri Mar 10 15:29:38 2023 -0800

    Spark: Minor fixes for CreateChangelogViewProcedure (#7072)
---
 .../iceberg/spark/procedures/BaseProcedure.java    |  3 +--
 .../procedures/CreateChangelogViewProcedure.java   | 25 +++++++++++-----------
 .../iceberg/spark/procedures/BaseProcedure.java    |  3 +--
 .../procedures/CreateChangelogViewProcedure.java   | 25 +++++++++++-----------
 4 files changed, 26 insertions(+), 30 deletions(-)

diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
index 1babe4efae..ed0156adc5 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
@@ -148,9 +148,8 @@ abstract class BaseProcedure implements Procedure {
     }
   }
 
-  protected Dataset<Row> loadDataSetFromTable(Identifier tableIdent, 
Map<String, String> options) {
+  protected Dataset<Row> loadRows(Identifier tableIdent, Map<String, String> 
options) {
     String tableName = Spark3Util.quotedFullIdentifier(tableCatalog().name(), 
tableIdent);
-    // no need to validate the read options here since the reader will 
validate them
     return spark().read().options(options).table(tableName);
   }
 
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
index ab844e08e9..f5bc1310ba 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -134,7 +134,7 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
 
     // load insert and deletes from the changelog table
     Identifier changelogTableIdent = changelogTableIdent(tableIdent);
-    Dataset<Row> df = loadDataSetFromTable(changelogTableIdent, options(args));
+    Dataset<Row> df = loadRows(changelogTableIdent, options(args));
 
     if (shouldComputeUpdateImages(args)) {
       df = computeUpdateImages(identifierColumns(args, tableIdent), df);
@@ -152,16 +152,15 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
   private Dataset<Row> computeUpdateImages(String[] identifierColumns, 
Dataset<Row> df) {
     Preconditions.checkArgument(
         identifierColumns.length > 0,
-        "Cannot compute the update-rows because identifier columns are not 
set");
+        "Cannot compute the update images because identifier columns are not 
set");
 
-    Column[] repartitionColumns = new Column[identifierColumns.length + 1];
+    Column[] repartitionSpec = new Column[identifierColumns.length + 1];
     for (int i = 0; i < identifierColumns.length; i++) {
-      repartitionColumns[i] = df.col(identifierColumns[i]);
+      repartitionSpec[i] = df.col(identifierColumns[i]);
     }
-    repartitionColumns[repartitionColumns.length - 1] =
-        df.col(MetadataColumns.CHANGE_ORDINAL.name());
+    repartitionSpec[repartitionSpec.length - 1] = 
df.col(MetadataColumns.CHANGE_ORDINAL.name());
 
-    return applyChangelogIterator(df, repartitionColumns);
+    return applyChangelogIterator(df, repartitionSpec);
   }
 
   private boolean shouldComputeUpdateImages(InternalRow args) {
@@ -182,12 +181,12 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
   }
 
   private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
-    Column[] repartitionColumns =
+    Column[] repartitionSpec =
         Arrays.stream(df.columns())
             .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
             .map(df::col)
             .toArray(Column[]::new);
-    return applyChangelogIterator(df, repartitionColumns);
+    return applyChangelogIterator(df, repartitionSpec);
   }
 
   private String[] identifierColumns(InternalRow args, Identifier tableIdent) {
@@ -233,13 +232,13 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
     }
   }
 
-  private Dataset<Row> applyChangelogIterator(Dataset<Row> df, Column[] 
repartitionColumns) {
-    Column[] sortSpec = sortSpec(df, repartitionColumns);
+  private Dataset<Row> applyChangelogIterator(Dataset<Row> df, Column[] 
repartitionSpec) {
+    Column[] sortSpec = sortSpec(df, repartitionSpec);
     StructType schema = df.schema();
     String[] identifierFields =
-        
Arrays.stream(repartitionColumns).map(Column::toString).toArray(String[]::new);
+        
Arrays.stream(repartitionSpec).map(Column::toString).toArray(String[]::new);
 
-    return df.repartition(repartitionColumns)
+    return df.repartition(repartitionSpec)
         .sortWithinPartitions(sortSpec)
         .mapPartitions(
             (MapPartitionsFunction<Row, Row>)
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
index 1babe4efae..ed0156adc5 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
@@ -148,9 +148,8 @@ abstract class BaseProcedure implements Procedure {
     }
   }
 
-  protected Dataset<Row> loadDataSetFromTable(Identifier tableIdent, 
Map<String, String> options) {
+  protected Dataset<Row> loadRows(Identifier tableIdent, Map<String, String> 
options) {
     String tableName = Spark3Util.quotedFullIdentifier(tableCatalog().name(), 
tableIdent);
-    // no need to validate the read options here since the reader will 
validate them
     return spark().read().options(options).table(tableName);
   }
 
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
index b47cc0de0b..3234b7f714 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -140,7 +140,7 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
 
     // load insert and deletes from the changelog table
     Identifier changelogTableIdent = changelogTableIdent(tableIdent);
-    Dataset<Row> df = loadDataSetFromTable(changelogTableIdent, 
options(input));
+    Dataset<Row> df = loadRows(changelogTableIdent, options(input));
 
     if (shouldComputeUpdateImages(input)) {
       df = computeUpdateImages(identifierColumns(input, tableIdent), df);
@@ -158,16 +158,15 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
   private Dataset<Row> computeUpdateImages(String[] identifierColumns, 
Dataset<Row> df) {
     Preconditions.checkArgument(
         identifierColumns.length > 0,
-        "Cannot compute the update-rows because identifier columns are not 
set");
+        "Cannot compute the update images because identifier columns are not 
set");
 
-    Column[] repartitionColumns = new Column[identifierColumns.length + 1];
+    Column[] repartitionSpec = new Column[identifierColumns.length + 1];
     for (int i = 0; i < identifierColumns.length; i++) {
-      repartitionColumns[i] = df.col(identifierColumns[i]);
+      repartitionSpec[i] = df.col(identifierColumns[i]);
     }
-    repartitionColumns[repartitionColumns.length - 1] =
-        df.col(MetadataColumns.CHANGE_ORDINAL.name());
+    repartitionSpec[repartitionSpec.length - 1] = 
df.col(MetadataColumns.CHANGE_ORDINAL.name());
 
-    return applyChangelogIterator(df, repartitionColumns);
+    return applyChangelogIterator(df, repartitionSpec);
   }
 
   private boolean shouldComputeUpdateImages(ProcedureInput input) {
@@ -181,12 +180,12 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
   }
 
   private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
-    Column[] repartitionColumns =
+    Column[] repartitionSpec =
         Arrays.stream(df.columns())
             .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
             .map(df::col)
             .toArray(Column[]::new);
-    return applyChangelogIterator(df, repartitionColumns);
+    return applyChangelogIterator(df, repartitionSpec);
   }
 
   private String[] identifierColumns(ProcedureInput input, Identifier 
tableIdent) {
@@ -214,13 +213,13 @@ public class CreateChangelogViewProcedure extends 
BaseProcedure {
     return input.string(CHANGELOG_VIEW_PARAM, defaultValue);
   }
 
-  private Dataset<Row> applyChangelogIterator(Dataset<Row> df, Column[] 
repartitionColumns) {
-    Column[] sortSpec = sortSpec(df, repartitionColumns);
+  private Dataset<Row> applyChangelogIterator(Dataset<Row> df, Column[] 
repartitionSpec) {
+    Column[] sortSpec = sortSpec(df, repartitionSpec);
     StructType schema = df.schema();
     String[] identifierFields =
-        
Arrays.stream(repartitionColumns).map(Column::toString).toArray(String[]::new);
+        
Arrays.stream(repartitionSpec).map(Column::toString).toArray(String[]::new);
 
-    return df.repartition(repartitionColumns)
+    return df.repartition(repartitionSpec)
         .sortWithinPartitions(sortSpec)
         .mapPartitions(
             (MapPartitionsFunction<Row, Row>)

Reply via email to