szehon-ho commented on code in PR #13106:
URL: https://github.com/apache/iceberg/pull/13106#discussion_r2181343022
##########
spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java:
##########
@@ -168,22 +168,26 @@ public void testInvalidCherrypickSnapshotCases() {
assertThatThrownBy(
() -> sql("CALL %s.system.cherrypick_snapshot('n', table => 't',
1L)", catalogName))
.isInstanceOf(AnalysisException.class)
- .hasMessage("Named and positional arguments cannot be mixed");
+ .hasMessage(
+ "[UNEXPECTED_POSITIONAL_ARGUMENT] Cannot invoke routine
`cherrypick_snapshot` because it contains positional argument(s) following the
named argument assigned to `table`; please rearrange them so the positional
arguments come first and then retry the query again. SQLSTATE: 4274K");
assertThatThrownBy(() -> sql("CALL %s.custom.cherrypick_snapshot('n', 't',
1L)", catalogName))
.isInstanceOf(AnalysisException.class)
- .hasMessage("Catalog %s does not support procedures.", catalogName);
+ .hasMessage(
+ "[FAILED_TO_LOAD_ROUTINE] Failed to load routine
`%s`.`custom`.`cherrypick_snapshot`. SQLSTATE: 38000",
+ catalogName);
assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('t')",
catalogName))
.isInstanceOf(AnalysisException.class)
- .hasMessage("Missing required parameters: [snapshot_id]");
+ .hasMessage(
+ "[REQUIRED_PARAMETER_NOT_FOUND] Cannot invoke routine
`cherrypick_snapshot` because the parameter named `snapshot_id` is required,
but the routine call did not supply a value. Please update the routine call to
supply an argument value (either positionally at index 0 or by name) and retry
the query again. SQLSTATE: 4274K");
assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('', 1L)",
catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for argument table");
assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('t',
2.2)", catalogName))
- .isInstanceOf(AnalysisException.class)
- .hasMessageStartingWith("Wrong arg type for snapshot_id: cannot cast");
+ .isInstanceOf(RuntimeException.class)
Review Comment:
can we make the test capture the original idea? (cast exception)
##########
spark/v4.0/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala:
##########
@@ -76,7 +71,7 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession)
extends Strategy wi
CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, create,
replace, ifNotExists) :: Nil
case CreateOrReplaceTag(
- IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, create,
replace, ifNotExists) =>
+ IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, create,
replace, ifNotExists) =>
Review Comment:
can we revert unnecessary change?
##########
spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java:
##########
@@ -199,8 +199,8 @@ public void addAvroFile() throws Exception {
record2.put("data", "b");
File outputFile = temp.resolve("test.avro").toFile();
- DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter(schema);
- DataFileWriter<GenericRecord> dataFileWriter = new
DataFileWriter(datumWriter);
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
Review Comment:
is this related?
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java:
##########
@@ -178,6 +184,29 @@ protected InternalRow newInternalRow(Object... values) {
return new GenericInternalRow(values);
}
+ protected static class Result implements LocalScan {
+ private final StructType readSchema;
+ private final InternalRow[] rows;
+
+ public Result(StructType readSchema, InternalRow[] rows) {
+ this.readSchema = readSchema;
+ this.rows = rows;
+ }
+
+ public StructType readSchema() {
+ return this.readSchema;
+ }
+
+ @Override
+ public InternalRow[] rows() {
+ return this.rows;
+ }
+ }
+
+ protected Iterator<Scan> asIteratorScan(StructType readSchema,
InternalRow... rows) {
Review Comment:
nit: scanIterator? (its an iterator of scan, so scan be an adjective of
iterator)
##########
spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java:
##########
@@ -250,27 +250,33 @@ public void testInvalidRollbackToSnapshotCases() {
"CALL %s.system.rollback_to_snapshot(namespace => 'n1',
table => 't', 1L)",
catalogName))
.isInstanceOf(AnalysisException.class)
- .hasMessage("Named and positional arguments cannot be mixed");
+ .hasMessage(
+ "[UNEXPECTED_POSITIONAL_ARGUMENT] Cannot invoke routine
`rollback_to_snapshot` because it contains positional argument(s) following the
named argument assigned to `table`; please rearrange them so the positional
arguments come first and then retry the query again. SQLSTATE: 4274K");
assertThatThrownBy(() -> sql("CALL %s.custom.rollback_to_snapshot('n',
't', 1L)", catalogName))
.isInstanceOf(AnalysisException.class)
- .hasMessage("Catalog %s does not support procedures.", catalogName);
+ .hasMessage(
+ "[FAILED_TO_LOAD_ROUTINE] Failed to load routine
`%s`.`custom`.`rollback_to_snapshot`. SQLSTATE: 38000",
+ catalogName);
assertThatThrownBy(() -> sql("CALL %s.system.rollback_to_snapshot('t')",
catalogName))
.isInstanceOf(AnalysisException.class)
- .hasMessage("Missing required parameters: [snapshot_id]");
+ .hasMessage(
+ "[REQUIRED_PARAMETER_NOT_FOUND] Cannot invoke routine
`rollback_to_snapshot` because the parameter named `snapshot_id` is required,
but the routine call did not supply a value. Please update the routine call to
supply an argument value (either positionally at index 0 or by name) and retry
the query again. SQLSTATE: 4274K");
assertThatThrownBy(() -> sql("CALL %s.system.rollback_to_snapshot(1L)",
catalogName))
.isInstanceOf(AnalysisException.class)
- .hasMessage("Missing required parameters: [snapshot_id]");
+ .hasMessage(
+ "[REQUIRED_PARAMETER_NOT_FOUND] Cannot invoke routine
`rollback_to_snapshot` because the parameter named `snapshot_id` is required,
but the routine call did not supply a value. Please update the routine call to
supply an argument value (either positionally at index 0 or by name) and retry
the query again. SQLSTATE: 4274K");
assertThatThrownBy(() -> sql("CALL %s.system.rollback_to_snapshot(table =>
't')", catalogName))
.isInstanceOf(AnalysisException.class)
- .hasMessage("Missing required parameters: [snapshot_id]");
+ .hasMessage(
+ "[REQUIRED_PARAMETER_NOT_FOUND] Cannot invoke routine
`rollback_to_snapshot` because the parameter named `snapshot_id` is required,
but the routine call did not supply a value. Please update the routine call to
supply an argument value (either positionally at index 1 or by name) and retry
the query again. SQLSTATE: 4274K");
assertThatThrownBy(() -> sql("CALL %s.system.rollback_to_snapshot('t',
2.2)", catalogName))
- .isInstanceOf(AnalysisException.class)
- .hasMessage("Wrong arg type for snapshot_id: cannot cast
DecimalType(2,1) to LongType");
+ .isInstanceOf(RuntimeException.class)
Review Comment:
can we make the test support the original idea?
##########
spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java:
##########
@@ -265,7 +266,7 @@ public void testRewriteSmallManifestsWithoutCaching() {
.hasSize(1);
}
- @TestTemplate
+ @Disabled // Spark SQL does not support case insensitive for named arguments
Review Comment:
unfortunately spark 4.0 will be supported in Iceberg 1.10?
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java:
##########
@@ -50,21 +53,28 @@
* @see SparkActions#deleteOrphanFiles(Table)
*/
public class RemoveOrphanFilesProcedure extends BaseProcedure {
+
+ public static final String NAME = "remove_orphan_files";
+
private static final Logger LOG =
LoggerFactory.getLogger(RemoveOrphanFilesProcedure.class);
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
- ProcedureParameter.required("table", DataTypes.StringType),
- ProcedureParameter.optional("older_than", DataTypes.TimestampType),
- ProcedureParameter.optional("location", DataTypes.StringType),
- ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
- ProcedureParameter.optional("max_concurrent_deletes",
DataTypes.IntegerType),
- ProcedureParameter.optional("file_list_view", DataTypes.StringType),
- ProcedureParameter.optional("equal_schemes", STRING_MAP),
- ProcedureParameter.optional("equal_authorities", STRING_MAP),
- ProcedureParameter.optional("prefix_mismatch_mode",
DataTypes.StringType),
+ ProcedureParameter.in("table", DataTypes.StringType).build(),
+ ProcedureParameter.in("older_than",
DataTypes.TimestampType).defaultValue("NULL").build(),
Review Comment:
this is ugly but i guess there isnt a better way. Can we encapsulate these
in a method in baseProcedure? ie, optional("older_than",
DataTypes.TimestampType)
While we are at it, might as well make required()
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java:
##########
@@ -178,6 +184,29 @@ protected InternalRow newInternalRow(Object... values) {
return new GenericInternalRow(values);
}
+ protected static class Result implements LocalScan {
+ private final StructType readSchema;
+ private final InternalRow[] rows;
+
+ public Result(StructType readSchema, InternalRow[] rows) {
+ this.readSchema = readSchema;
+ this.rows = rows;
+ }
+
+ public StructType readSchema() {
Review Comment:
override here?
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java:
##########
@@ -178,6 +184,29 @@ protected InternalRow newInternalRow(Object... values) {
return new GenericInternalRow(values);
}
+ protected static class Result implements LocalScan {
+ private final StructType readSchema;
+ private final InternalRow[] rows;
+
+ public Result(StructType readSchema, InternalRow[] rows) {
+ this.readSchema = readSchema;
+ this.rows = rows;
+ }
+
+ public StructType readSchema() {
+ return this.readSchema;
+ }
+
+ @Override
+ public InternalRow[] rows() {
+ return this.rows;
+ }
+ }
+
+ protected Iterator<Scan> asIteratorScan(StructType readSchema,
InternalRow... rows) {
Review Comment:
nit: InternalRow[]? All the callers seem to pass that instead
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -53,16 +56,20 @@
class AddFilesProcedure extends BaseProcedure {
+ public static final String NAME = "add_files";
Review Comment:
not need public (for all these?) Avoid having too many unnecessary
dependency if we need to move these?
##########
spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java:
##########
@@ -167,18 +167,21 @@ public void testInvalidFastForwardBranchCases() {
sql(
"CALL %s.system.fast_forward('test_table', branch =>
'main', to => 'newBranch')",
catalogName))
- .isInstanceOf(AnalysisException.class)
- .hasMessage("Named and positional arguments cannot be mixed");
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageStartingWith("Couldn't load table");
Review Comment:
was this an error in the original test?
##########
spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java:
##########
@@ -216,15 +219,16 @@ public void testInvalidRollbackToSnapshotCases() {
assertThatThrownBy(
() -> sql("CALL %s.system.set_current_snapshot(snapshot_id =>
1L)", catalogName))
.isInstanceOf(AnalysisException.class)
- .hasMessage("Missing required parameters: [table]");
+ .hasMessage(
+ "[REQUIRED_PARAMETER_NOT_FOUND] Cannot invoke routine
`set_current_snapshot` because the parameter named `table` is required, but the
routine call did not supply a value. Please update the routine call to supply
an argument value (either positionally at index 0 or by name) and retry the
query again. SQLSTATE: 4274K");
assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot(table =>
't')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Either snapshot_id or ref must be provided, not both");
assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot('t',
2.2)", catalogName))
- .isInstanceOf(AnalysisException.class)
- .hasMessage("Wrong arg type for snapshot_id: cannot cast
DecimalType(2,1) to LongType");
+ .isInstanceOf(RuntimeException.class)
Review Comment:
same comment as others
##########
spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java:
##########
@@ -147,14 +147,16 @@ public void testAncestorOfUsingNamedArgs() {
public void testInvalidAncestorOfCases() {
assertThatThrownBy(() -> sql("CALL %s.system.ancestors_of()", catalogName))
.isInstanceOf(AnalysisException.class)
- .hasMessage("Missing required parameters: [table]");
+ .hasMessage(
+ "[REQUIRED_PARAMETER_NOT_FOUND] Cannot invoke routine
`ancestors_of` because the parameter named `table` is required, but the routine
call did not supply a value. Please update the routine call to supply an
argument value (either positionally at index 0 or by name) and retry the query
again. SQLSTATE: 4274K");
assertThatThrownBy(() -> sql("CALL %s.system.ancestors_of('')",
catalogName))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot handle an empty identifier for parameter 'table'");
assertThatThrownBy(() -> sql("CALL %s.system.ancestors_of('%s', 1.1)",
catalogName, tableIdent))
- .isInstanceOf(AnalysisException.class)
- .hasMessageStartingWith("Wrong arg type for snapshot_id: cannot cast");
+ .isInstanceOf(RuntimeException.class)
Review Comment:
Can we change the test to capture the original idea?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]