kbendick commented on a change in pull request #1393:
URL: https://github.com/apache/iceberg/pull/1393#discussion_r479712692



##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
##########
@@ -88,4 +105,172 @@ public void testRenameTable() {
         Collections.singletonList(TableColumn.of("id", DataTypes.BIGINT())),
         tEnv.from("tl2").getSchema().getTableColumns());
   }
+
+  @Test
+  public void testCreateTable() throws TableNotExistException {
+    tEnv.executeSql("CREATE TABLE tl(id BIGINT)");
+
+    Table table = table("tl");
+    Assert.assertEquals(
+        new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct(),
+        table.schema().asStruct());
+    Assert.assertEquals(Maps.newHashMap(), table.properties());
+
+    CatalogTable catalogTable = catalogTable("tl");
+    Assert.assertEquals(TableSchema.builder().field("id", 
DataTypes.BIGINT()).build(), catalogTable.getSchema());
+    Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
+  }
+
+  @Test
+  public void testCreateTableLocation() {
+    Assume.assumeFalse("HadoopCatalog does not support creating table with 
location", isHadoopCatalog);
+
+    tEnv.executeSql("CREATE TABLE tl(id BIGINT) WITH 
('location'='/tmp/location')");
+
+    Table table = table("tl");
+    Assert.assertEquals(
+        new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct(),
+        table.schema().asStruct());
+    Assert.assertEquals("/tmp/location", table.location());
+    Assert.assertEquals(Maps.newHashMap(), table.properties());
+  }
+
+  @Test
+  public void testCreatePartitionTable() throws TableNotExistException {
+    tEnv.executeSql("CREATE TABLE tl(id BIGINT, dt STRING) PARTITIONED 
BY(dt)");
+
+    Table table = table("tl");
+    Assert.assertEquals(
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.LongType.get()),
+            Types.NestedField.optional(2, "dt", 
Types.StringType.get())).asStruct(),
+        table.schema().asStruct());
+    
Assert.assertEquals(PartitionSpec.builderFor(table.schema()).identity("dt").build(),
 table.spec());
+    Assert.assertEquals(Maps.newHashMap(), table.properties());
+
+    CatalogTable catalogTable = catalogTable("tl");
+    Assert.assertEquals(
+        TableSchema.builder().field("id", DataTypes.BIGINT()).field("dt", 
DataTypes.STRING()).build(),
+        catalogTable.getSchema());
+    Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
+    Assert.assertEquals(Collections.singletonList("dt"), 
catalogTable.getPartitionKeys());
+  }
+
+  @Test
+  public void testLoadTransformPartitionTable() throws TableNotExistException {
+    Schema schema = new Schema(Types.NestedField.optional(0, "id", 
Types.LongType.get()));
+    validationCatalog.createTable(
+        TableIdentifier.of(icebergNamespace, "tl"), schema,
+        PartitionSpec.builderFor(schema).bucket("id", 100).build());
+
+    CatalogTable catalogTable = catalogTable("tl");
+    Assert.assertEquals(
+        TableSchema.builder().field("id", DataTypes.BIGINT()).build(),
+        catalogTable.getSchema());
+    Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
+    Assert.assertEquals(Collections.emptyList(), 
catalogTable.getPartitionKeys());
+  }
+
+  @Test
+  public void testAlterTable() throws TableNotExistException {
+    tEnv.executeSql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')");
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("oldK", "oldV");
+
+    // new
+    tEnv.executeSql("ALTER TABLE tl SET('newK'='newV')");
+    properties.put("newK", "newV");
+    Assert.assertEquals(properties, table("tl").properties());
+
+    // update old
+    tEnv.executeSql("ALTER TABLE tl SET('oldK'='oldV2')");
+    properties.put("oldK", "oldV2");
+    Assert.assertEquals(properties, table("tl").properties());
+
+    // remove property
+    CatalogTable catalogTable = catalogTable("tl");
+    properties.remove("oldK");
+    tEnv.getCatalog(tEnv.getCurrentCatalog()).get().alterTable(
+        new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false);
+    Assert.assertEquals(properties, table("tl").properties());
+  }
+
+  @Test
+  public void testRelocateTable() {
+    Assume.assumeFalse("HadoopCatalog does not support relocate table", 
isHadoopCatalog);
+
+    tEnv.executeSql("CREATE TABLE tl(id BIGINT)");
+    tEnv.executeSql("ALTER TABLE tl SET('location'='/tmp/location')");

Review comment:
       However, looking forward at the Flink 1.12 prerelease docs, it appears 
that there's added support for a `Hive Dialect` which supports `LOCATION` 
clause.
   
   
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html#create-1
   
   ```hiveql
   CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
     [(col_name data_type [column_constraint] [COMMENT col_comment], ... 
[table_constraint])]
     [COMMENT table_comment]
     [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
     [
       [ROW FORMAT row_format]
       [STORED AS file_format]
     ]
     [LOCATION fs_path]
     [TBLPROPERTIES (property_name=property_value, ...)]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to