kbendick commented on a change in pull request #3745:
URL: https://github.com/apache/iceberg/pull/3745#discussion_r769142116
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -766,9 +774,22 @@ public static TableIdentifier
identifierToTableIdentifier(Identifier identifier)
org.apache.spark.sql.execution.datasources.PartitionSpec spec =
fileIndex.partitionSpec();
StructType schema = spec.partitionColumns();
+ if (schema.isEmpty()) {
+ return new ArrayList<>();
+ }
+
+ List<org.apache.spark.sql.catalyst.expressions.Expression>
filterExpressions =
+ getPartitionFilterExpressions(schema, partitionFilter);
+
+ List<org.apache.spark.sql.catalyst.expressions.Expression> dataFilters =
new java.util.ArrayList<>();
Review comment:
Nit: Same note about avoiding `new ArrayList<>` in favor of one of the
ones mentioned above 👍
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -779,10 +800,54 @@ public static TableIdentifier
identifierToTableIdentifier(Identifier identifier)
Object value =
CatalystTypeConverters.convertToScala(catalystValue, field.dataType());
values.put(field.name(), value.toString());
});
- return new SparkPartition(values, partition.path().toString(),
format);
+ FileStatus fileStatus =
+
scala.collection.JavaConverters.seqAsJavaListConverter(partition.files()).asJava().get(0);
+
+ return new SparkPartition(values,
fileStatus.getPath().getParent().toString(), format);
}).collect(Collectors.toList());
}
+ private static List getPartitionFilterExpressions(StructType schema,
+ Map<String, String>
partitionFilter) {
+ List<org.apache.spark.sql.catalyst.expressions.Expression>
filterExpressions = new java.util.ArrayList<>();
+ for (Map.Entry<String, String> entry : partitionFilter.entrySet()) {
+ try {
+ // IllegalArgumentException is thrown if schema doesn't contain this
entry,
+ // which means partition filter is not on partition columns.
+ int index = schema.fieldIndex(entry.getKey());
+ org.apache.spark.sql.types.DataType dataType =
schema.fields()[index].dataType();
+ BoundReference ref = new BoundReference(index, dataType, true);
+ if (dataType.sameType(DataTypes.IntegerType) ||
dataType.sameType(DataTypes.ShortType) ||
+ dataType.sameType(DataTypes.ByteType)) {
+ filterExpressions.add(new EqualTo(ref,
+
org.apache.spark.sql.catalyst.expressions.Literal.create(Integer.parseInt(entry.getValue()),
+ DataTypes.IntegerType)));
+ } else if
(schema.fields()[index].dataType().sameType(DataTypes.StringType)) {
+ filterExpressions.add(new EqualTo(ref,
+
org.apache.spark.sql.catalyst.expressions.Literal.create(entry.getValue(),
DataTypes.StringType)));
+ } else if
(schema.fields()[index].dataType().sameType(DataTypes.LongType)) {
+ filterExpressions.add(new EqualTo(ref,
+
org.apache.spark.sql.catalyst.expressions.Literal.create(Long.parseLong(entry.getValue()),
+ DataTypes.LongType)));
+ } else if
(schema.fields()[index].dataType().sameType(DataTypes.DateType)) {
+ filterExpressions.add(new EqualTo(ref,
+
org.apache.spark.sql.catalyst.expressions.Literal.create(entry.getValue(),
DataTypes.DateType)));
+ } else if
(schema.fields()[index].dataType().sameType(DataTypes.TimestampType)) {
+ filterExpressions.add(new EqualTo(ref,
+
org.apache.spark.sql.catalyst.expressions.Literal.create(entry.getValue(),
DataTypes.TimestampType)));
+ } else if (schema.fields()[index].dataType()
+ .sameType(DataTypes.CalendarIntervalType)) {
+ filterExpressions.add(new EqualTo(ref,
+
org.apache.spark.sql.catalyst.expressions.Literal.create(entry.getValue(),
+ DataTypes.CalendarIntervalType)));
Review comment:
For this large if-else-if chain, you might want to look into this
lookup-map pattern used here:
https://github.com/apache/iceberg/blob/466073b7d8c23ebeae045822ee6e1a1104a5ed5a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java#L80-L220
I'm not sure if a look-up map can be used here because of the usage of
`sameType` function, but it might be worth looking into 😄
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -766,9 +774,22 @@ public static TableIdentifier
identifierToTableIdentifier(Identifier identifier)
org.apache.spark.sql.execution.datasources.PartitionSpec spec =
fileIndex.partitionSpec();
StructType schema = spec.partitionColumns();
+ if (schema.isEmpty()) {
+ return new ArrayList<>();
Review comment:
Nit: Can you use `Lists.newArrayList()` or `ImmutableList.empty()` here?
For `Lists`, we use the repackaged internal guava version from
`org.apache.iceberg.relocated.com.google.common.collect.Lists`. The same is
true for `ImmutableList`, which is already imported.
You could also use `Collections.emptyList()` to be similar to the `emptyMap`
above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]