szehon-ho edited a comment on issue #2783:
URL: https://github.com/apache/iceberg/issues/2783#issuecomment-873869272
Adding a reproduction test (can run in spark3-extensions test case, for
example):
```
package org.apache.iceberg.spark.extensions;
import com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import scala.Some;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class TestSparkMetadataTable extends SparkExtensionsTestBase {
public TestSparkMetadataTable(String catalogName, String implementation,
Map<String, String> config) {
super(catalogName, implementation, config);
}
@Test
public void testDataFileProjectionError1() throws Exception {
// init load
List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1,
"1"));
Dataset<Row> inputDf = spark.createDataFrame(records,
SimpleRecord.class);
inputDf.writeTo(tableName).create();
SparkCatalog catalog = (SparkCatalog)
spark.sessionState().catalogManager().catalog(catalogName);
String[] tableIdentifiers = tableName.split("\\.");
Identifier metaId = Identifier.of(
new String[]{tableIdentifiers[1], tableIdentifiers[2]}, "entries");
SparkTable metaTable = catalog.loadTable(metaId);
Dataset<Row> entriesDs = Dataset.ofRows(spark,
DataSourceV2Relation.create(metaTable, Some.apply(catalog), Some.apply(
metaId)));
Column aggCol = entriesDs.col("data_file.record_count");
Dataset<Row> agg = entriesDs.agg(max(aggCol));
Assert.assertFalse(agg.collectAsList().isEmpty());
}
@Test
public void testDataFileProjectionError2() throws Exception {
// init load
List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1,
"1"));
Dataset<Row> inputDf = spark.createDataFrame(records,
SimpleRecord.class);
inputDf.writeTo(tableName).create();
Dataset<Row> stringDs = spark.createDataset(Arrays.asList("my_path"),
Encoders.STRING())
.toDF("file_path");
SparkCatalog catalog = (SparkCatalog)
spark.sessionState().catalogManager().catalog(catalogName);
String[] tableIdentifiers = tableName.split("\\.");
Identifier metaId = Identifier.of(
new String[]{tableIdentifiers[1], tableIdentifiers[2]}, "entries");
SparkTable metaTable = catalog.loadTable(metaId);
Dataset<Row> entriesDs = Dataset.ofRows(spark,
DataSourceV2Relation.create(metaTable, Some.apply(catalog), Some.apply(
metaId)));
Column joinCond =
entriesDs.col("data_file.file_path").equalTo(stringDs.col("file_path"));
Dataset<Row> res = entriesDs.join(stringDs, joinCond);
boolean empty = res.isEmpty();
Assert.assertEquals(true, empty);
}
@After
public void dropTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
}
```
Side note: I use "data_file" field to reproduce it, if I do not then I hit
other error: https://github.com/apache/iceberg/issues/1378 and
https://github.com/apache/iceberg/issues/1735 (same underlying error)
There are two tests, it shows that even a simple aggregation now fails even
with the workaround of using "data_file".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]