hililiwei commented on code in PR #4625:
URL: https://github.com/apache/iceberg/pull/4625#discussion_r865523549


##########
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java:
##########
@@ -78,7 +79,9 @@ protected TableEnvironment getTableEnv() {
               .build();
 
           TableEnvironment env = TableEnvironment.create(settings);
-          
env.getConfig().getConfiguration().set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
 false);
+          env.getConfig().getConfiguration()
+              
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false)
+              .set(TableConfigOptions.LOCAL_TIME_ZONE, "UTC");

Review Comment:
   good



##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java:
##########
@@ -52,35 +63,69 @@
  */
 public class FlinkSchemaUtil {
 
+  public static final String FLINK_PREFIX = "flink.";
+
+  public static final String COMPUTED_COLUMNS = "computed-columns.";

Review Comment:
   use `computed-column` ?



##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java:
##########
@@ -172,4 +217,110 @@ public static TableSchema toSchema(Schema schema) {
 
     return builder.build();
   }
+
+
+   /**
+   * Convert a {@link Schema} to a {@link Schema}.
+   *
+   * @param schema iceberg schema to convert.
+   * @return Flink Schema.
+   */
+  public static org.apache.flink.table.api.Schema toSchema(Schema schema, 
Map<String, String> properties) {
+
+    org.apache.flink.table.api.Schema.Builder builder = 
org.apache.flink.table.api.Schema.newBuilder();
+
+    // get watermark and computed columns
+    Map<String, String> watermarkMap = Maps.newHashMap();
+    Map<String, String> computedColumnsMap = Maps.newHashMap();
+    properties.keySet().stream()
+        .filter(k -> k.startsWith(FLINK_PREFIX) && properties.get(k) != null)
+        .forEach(k -> {
+          final String name = k.substring(k.lastIndexOf('.') + 1);
+          String expr = properties.get(k);
+          if (k.startsWith(WATERMARK_PREFIX)) {
+            watermarkMap.put(name, expr);
+          } else if (k.startsWith(COMPUTED_COLUMNS_PREFIX)) {
+            computedColumnsMap.put(name, expr);
+          }
+        });
+
+    // add physical columns.
+    for (RowType.RowField field : convert(schema).getFields()) {
+      builder.column(field.getName(), 
TypeConversions.fromLogicalToDataType(field.getType()));
+    }
+
+    // add computed columns.
+    computedColumnsMap.forEach(builder::columnByExpression);
+
+    // add watermarks.
+    watermarkMap.forEach(builder::watermark);
+
+    // add primary key.
+    List<String> primaryKey = getPrimaryKeyFromSchema(schema);
+    if (!primaryKey.isEmpty()) {
+      builder.primaryKey(primaryKey.toArray(new String[0]));
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Convert a {@link CatalogTable} to a {@link ResolvedSchema}.
+   *
+   * @param table flink unresolved schema to convert.
+   * @return Flink ResolvedSchema.
+   */
+  public static ResolvedSchema convertToResolvedSchema(CatalogTable table) {
+    StreamExecutionEnvironment env =  
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
+    StreamTableEnvironment streamTableEnvironment = 
StreamTableEnvironment.create(env);
+    CatalogManager catalogManager = ((TableEnvironmentImpl) 
streamTableEnvironment).getCatalogManager();
+    SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
+    return table.getUnresolvedSchema().resolve(schemaResolver);

Review Comment:
   ```suggestion
       Configuration configuration = 
ExecutionEnvironment.getExecutionEnvironment().getConfiguration();
       TableEnvironment tableEnvironment = 
TableEnvironment.create(configuration);
       Preconditions.checkArgument(table.getDescription().isPresent(), "Illegal 
table.");
       return 
tableEnvironment.from(table.getDescription().get()).getResolvedSchema();
   ```
   
   Based on your 
comment(https://github.com/apache/iceberg/pull/4246#issuecomment-1110614217), I 
took a look at our previous proposal and made some suggestions, but it may not 
be optimal.
   Here, I think using 'TableEnvironment' directly is a better choice. It has 
better commonality, whether streaming, batch, or otherwise, and hides 
underlying differences.
   Let's see if somebody else has a better solution.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to