hililiwei commented on code in PR #4625:
URL: https://github.com/apache/iceberg/pull/4625#discussion_r865523549
##########
flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java:
##########
@@ -78,7 +79,9 @@ protected TableEnvironment getTableEnv() {
.build();
TableEnvironment env = TableEnvironment.create(settings);
-
env.getConfig().getConfiguration().set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM,
false);
+ env.getConfig().getConfiguration()
+
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false)
+ .set(TableConfigOptions.LOCAL_TIME_ZONE, "UTC");
Review Comment:
good
##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java:
##########
@@ -52,35 +63,69 @@
*/
public class FlinkSchemaUtil {
+ public static final String FLINK_PREFIX = "flink.";
+
+ public static final String COMPUTED_COLUMNS = "computed-columns.";
Review Comment:
use `computed-column` ?
##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java:
##########
@@ -172,4 +217,110 @@ public static TableSchema toSchema(Schema schema) {
return builder.build();
}
+
+
+ /**
+ * Convert a {@link Schema} to a {@link Schema}.
+ *
+ * @param schema iceberg schema to convert.
+ * @return Flink Schema.
+ */
+ public static org.apache.flink.table.api.Schema toSchema(Schema schema,
Map<String, String> properties) {
+
+ org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder();
+
+ // get watermark and computed columns
+ Map<String, String> watermarkMap = Maps.newHashMap();
+ Map<String, String> computedColumnsMap = Maps.newHashMap();
+ properties.keySet().stream()
+ .filter(k -> k.startsWith(FLINK_PREFIX) && properties.get(k) != null)
+ .forEach(k -> {
+ final String name = k.substring(k.lastIndexOf('.') + 1);
+ String expr = properties.get(k);
+ if (k.startsWith(WATERMARK_PREFIX)) {
+ watermarkMap.put(name, expr);
+ } else if (k.startsWith(COMPUTED_COLUMNS_PREFIX)) {
+ computedColumnsMap.put(name, expr);
+ }
+ });
+
+ // add physical columns.
+ for (RowType.RowField field : convert(schema).getFields()) {
+ builder.column(field.getName(),
TypeConversions.fromLogicalToDataType(field.getType()));
+ }
+
+ // add computed columns.
+ computedColumnsMap.forEach(builder::columnByExpression);
+
+ // add watermarks.
+ watermarkMap.forEach(builder::watermark);
+
+ // add primary key.
+ List<String> primaryKey = getPrimaryKeyFromSchema(schema);
+ if (!primaryKey.isEmpty()) {
+ builder.primaryKey(primaryKey.toArray(new String[0]));
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Convert a {@link CatalogTable} to a {@link ResolvedSchema}.
+ *
+ * @param table flink unresolved schema to convert.
+ * @return Flink ResolvedSchema.
+ */
+ public static ResolvedSchema convertToResolvedSchema(CatalogTable table) {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
+ StreamTableEnvironment streamTableEnvironment =
StreamTableEnvironment.create(env);
+ CatalogManager catalogManager = ((TableEnvironmentImpl)
streamTableEnvironment).getCatalogManager();
+ SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
+ return table.getUnresolvedSchema().resolve(schemaResolver);
Review Comment:
```suggestion
Configuration configuration =
ExecutionEnvironment.getExecutionEnvironment().getConfiguration();
TableEnvironment tableEnvironment =
TableEnvironment.create(configuration);
Preconditions.checkArgument(table.getDescription().isPresent(), "Illegal
table.");
return
tableEnvironment.from(table.getDescription().get()).getResolvedSchema();
```
Based on your
comment(https://github.com/apache/iceberg/pull/4246#issuecomment-1110614217), I
took a look at our previous proposal and made some suggestions, but it may not
be optimal.
Here, I think using 'TableEnvironment' directly is a better choice. It has
better commonality, whether streaming, batch, or otherwise, and hides
underlying differences.
Let's see if somebody else has a better solution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]