RussellSpitzer commented on a change in pull request #1287:
URL: https://github.com/apache/iceberg/pull/1287#discussion_r464683194
##########
File path:
spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
##########
@@ -110,24 +113,52 @@ public static void stopSpark() {
private Table table = null;
private Dataset<Row> logs = null;
- @Before
- public void setupTable() throws Exception {
+ /*
+ Use the Hive Based table to make Identity Partition Columns with no
duplication of the data in the underlying
+ parquet files. This makes sure that if the identity mapping fails, the test
will also fail.
+ */
+ private void setupParquet() throws Exception {
File location = temp.newFolder("logs");
+ File hiveLocation = temp.newFolder("hive");
+ String hiveTable = "hivetable";
Assert.assertTrue("Temp folder should exist", location.exists());
Map<String, String> properties =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
- this.table = TABLES.create(LOG_SCHEMA, spec, properties,
location.toString());
this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id",
"date", "level", "message");
+ spark.sql(String.format("DROP TABLE IF EXISTS %s", hiveTable));
+ logs.orderBy("date", "level", "id").write().partitionBy("date",
"level").format("parquet")
+ .option("path", hiveLocation.toString()).saveAsTable(hiveTable);
+
+ this.table = TABLES.create(SparkSchemaUtil.schemaForTable(spark,
hiveTable),
+ SparkSchemaUtil.specForTable(spark, hiveTable), properties,
location.toString());
+
+ SparkTableUtil.importSparkTable(spark, new TableIdentifier(hiveTable),
table, location.toString());
+ }
- logs.orderBy("date", "level",
"id").write().format("iceberg").mode("append").save(location.toString());
+ @Before
+ public void setupTable() throws Exception {
+ if (format.equals("parquet")) {
+ setupParquet();
+ } else {
+ File location = temp.newFolder("logs");
+ Assert.assertTrue("Temp folder should exist", location.exists());
+
+ Map<String, String> properties =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
+ this.table = TABLES.create(LOG_SCHEMA, spec, properties,
location.toString());
+ this.logs = spark.createDataFrame(LOGS, LogMessage.class).select("id",
"date", "level", "message");
+
+ logs.orderBy("date", "level",
"id").write().format("iceberg").mode("append").save(location.toString());
+ }
}
@Test
public void testFullProjection() {
List<Row> expected = logs.orderBy("id").collectAsList();
List<Row> actual = spark.read().format("iceberg")
.option("vectorization-enabled", String.valueOf(vectorized))
- .load(table.location()).orderBy("id").collectAsList();
+ .load(table.location()).orderBy("id")
+ .select("id", "date", "level", "message")
Review comment:
I spent some time digging into this,
When you call saveAsTable it ends up in this bit of code in DataFrameWriter
```scala
val tableDesc = CatalogTable(
identifier = tableIdent,
tableType = tableType,
storage = storage,
schema = new StructType,
provider = Some(source),
partitionColumnNames = partitioningColumns.getOrElse(Nil),
bucketSpec = getBucketSpec)
```
Which strips out whatever incoming schema you have. So the new table is
created without any information about the actual ordering of columns you used
in the create.
Then when the Relation is resolved, that's when the attributes are looked up
again and the schema is created from the Attribute output. So long story short,
saveAsTable doesn't care about your field ordering as far as I can tell.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]