dannycranmer commented on code in PR #15: URL: https://github.com/apache/flink-connector-aws/pull/15#discussion_r1026589061
########## flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; + +import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR; +import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME; + +/** DynamoDb specific configuration. */ +@PublicEvolving Review Comment: Why is this not `@Internal`? I think the public contract is `DynamoDbConnectorOptions` and this is a helper class? ########## flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.api.TableDescriptor; + +/** DynamoDb connector options. Made public for {@link TableDescriptor} to access it. */ +@PublicEvolving +public class DynamoDbConnectorOptions { Review Comment: How about the region, this is mandatory? How about the "overwrite by partition key" as optional? ########## flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.factories.FactoryUtil; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; + +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME; + +/** Factory for creating {@link DynamoDbDynamicSink}. */ +@Internal +public class DynamoDbDynamicSinkFactory extends AsyncDynamicTableSinkFactory { + + public static final String FACTORY_IDENTIFIER = "dynamodb"; + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper factoryHelper = + FactoryUtil.createTableFactoryHelper(this, context); + ResolvedCatalogTable catalogTable = context.getCatalogTable(); + + FactoryUtil.validateFactoryOptions(this, factoryHelper.getOptions()); + + DynamoDbConfiguration dynamoDbConfiguration = + new DynamoDbConfiguration(factoryHelper.getOptions()); + + Properties dynamoDbClientProperties = new Properties(); + dynamoDbClientProperties.putAll(catalogTable.getOptions()); + return DynamoDbDynamicSink.builder() + .setTableName(dynamoDbConfiguration.getTableName()) + .setFailOnError(dynamoDbConfiguration.getFailOnError()) + .setPhysicalDataType(catalogTable.getResolvedSchema().toPhysicalRowDataType()) + .setOverwriteByPartitionKeys(new HashSet<>(catalogTable.getPartitionKeys())) + .setDynamoDbClientProperties(dynamoDbClientProperties) + .build(); + } + + @Override + public String factoryIdentifier() { + return FACTORY_IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return ImmutableSet.of(TABLE_NAME); Review Comment: Region? Or are we keeping optional incase an endpoint URL is supplied? This is different to KDS https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java#L98 ########## flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink; +import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder; +import org.apache.flink.connector.dynamodb.sink.DynamoDbSink; +import org.apache.flink.connector.dynamodb.sink.DynamoDbSinkBuilder; +import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkV2Provider; +import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * A {@link DynamicTableSink} that describes how to create a {@link DynamoDbSink} from a logical + * description. + */ +@Internal +public class DynamoDbDynamicSink extends AsyncDynamicTableSink<DynamoDbWriteRequest> + implements SupportsPartitioning { + + private final String tableName; + private final boolean failOnError; + private final Properties dynamoDbClientProperties; + private final DataType physicalDataType; + private final Set<String> overwriteByPartitionKeys; + + protected DynamoDbDynamicSink( + @Nullable Integer maxBatchSize, + @Nullable Integer maxInFlightRequests, + @Nullable Integer maxBufferedRequests, + @Nullable Long maxBufferSizeInBytes, + @Nullable Long maxTimeInBufferMS, + String tableName, + boolean failOnError, + Properties dynamoDbClientProperties, + DataType physicalDataType, + Set<String> overwriteByPartitionKeys) { + super( + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBufferSizeInBytes, + maxTimeInBufferMS); + this.tableName = tableName; + this.failOnError = failOnError; + this.dynamoDbClientProperties = dynamoDbClientProperties; + this.physicalDataType = physicalDataType; + this.overwriteByPartitionKeys = overwriteByPartitionKeys; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { + return ChangelogMode.upsert(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + DynamoDbSinkBuilder<RowData> builder = + DynamoDbSink.<RowData>builder() + .setTableName(tableName) + .setFailOnError(failOnError) + .setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys)) + .setDynamoDbProperties(dynamoDbClientProperties) + .setElementConverter(new RowDataElementConverter(physicalDataType)); + + addAsyncOptionsToSinkBuilder(builder); + + return SinkV2Provider.of(builder.build()); + } + + @Override + public DynamicTableSink copy() { + return new DynamoDbDynamicSink( + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBufferSizeInBytes, + maxTimeInBufferMS, + tableName, + failOnError, + dynamoDbClientProperties, + physicalDataType, + overwriteByPartitionKeys); + } + + @Override + public String asSummaryString() { + return "DynamoDB"; + } + + @Override + public void applyStaticPartition(Map<String, String> partitions) { Review Comment: When is this called vs `overwriteByPartitionKeys` passed via the builder? ########## flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/converter/ArrayAttributeConverterProvider.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.table.converter; + +import org.apache.flink.annotation.Internal; + +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider; +import software.amazon.awssdk.enhanced.dynamodb.EnhancedType; + +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; + +/** Attribute converter provider for String Array. */ +@Internal +public class ArrayAttributeConverterProvider implements AttributeConverterProvider { + + private static final AttributeConverterProvider defaultAttributeConverterProvider = + AttributeConverterProvider.defaultProvider(); + + @Override + public <T> AttributeConverter<T> converterFor(EnhancedType<T> enhancedType) { Review Comment: How do you propose we keep this method in sync between Flink types and `EnhancedType` https://github.com/aws/aws-sdk-java-v2/blob/master/services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/EnhancedType.java ? ########## flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/converter/ArrayAttributeConverterProvider.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.table.converter; + +import org.apache.flink.annotation.Internal; + +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter; +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider; +import software.amazon.awssdk.enhanced.dynamodb.EnhancedType; + +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; + +/** Attribute converter provider for String Array. */ +@Internal +public class ArrayAttributeConverterProvider implements AttributeConverterProvider { + + private static final AttributeConverterProvider defaultAttributeConverterProvider = + AttributeConverterProvider.defaultProvider(); + + @Override + public <T> AttributeConverter<T> converterFor(EnhancedType<T> enhancedType) { + if (enhancedType.equals(EnhancedType.of(String[].class))) { + return (AttributeConverter<T>) getArrayAttributeConverter(String.class, enhancedType); + } else if (enhancedType.equals(EnhancedType.of(Boolean[].class))) { + return (AttributeConverter<T>) getArrayAttributeConverter(Boolean.class, enhancedType); + } else if (enhancedType.equals(EnhancedType.of(BigDecimal[].class))) { + return (AttributeConverter<T>) + getArrayAttributeConverter(BigDecimal.class, enhancedType); + } else if (enhancedType.equals(EnhancedType.of(Byte[].class))) { + return (AttributeConverter<T>) getArrayAttributeConverter(Byte.class, enhancedType); + } else if (enhancedType.equals(EnhancedType.of(Short[].class))) { + return (AttributeConverter<T>) getArrayAttributeConverter(Short.class, enhancedType); + } else if (enhancedType.equals(EnhancedType.of(Integer[].class))) { + return (AttributeConverter<T>) getArrayAttributeConverter(Integer.class, enhancedType); + } else if (enhancedType.equals(EnhancedType.of(Long[].class))) { + return (AttributeConverter<T>) getArrayAttributeConverter(Long.class, enhancedType); + } else if (enhancedType.equals(EnhancedType.of(Float[].class))) { + return (AttributeConverter<T>) getArrayAttributeConverter(Float.class, enhancedType); + } else if (enhancedType.equals(EnhancedType.of(LocalDate[].class))) { + return (AttributeConverter<T>) + getArrayAttributeConverter(LocalDate.class, enhancedType); + } else if (enhancedType.equals(EnhancedType.of(LocalTime[].class))) { + return (AttributeConverter<T>) + getArrayAttributeConverter(LocalTime.class, enhancedType); + } else if (enhancedType.equals(EnhancedType.of(LocalDateTime[].class))) { + return (AttributeConverter<T>) + getArrayAttributeConverter(LocalDateTime.class, enhancedType); + } else if (enhancedType.equals(EnhancedType.of(Instant[].class))) { + return (AttributeConverter<T>) getArrayAttributeConverter(Instant.class, enhancedType); + } + return null; Review Comment: What happens in this failure mode? Should we throw an exception here? As discussed, you missed `Double` ########## flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.KeyValueDataType; + +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider; +import software.amazon.awssdk.enhanced.dynamodb.EnhancedType; +import software.amazon.awssdk.enhanced.dynamodb.TableSchema; +import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.data.RowData.createFieldGetter; + +/** Converts from Flink Table API internal type of {@link RowData} to {@link AttributeValue}. */ +@Internal +public class RowDataToAttributeValueConverter implements Serializable { Review Comment: nit: You _could_ create this class lazily in `RowDataElementConverter` so this does not need to be serialised/serializable. ########## flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.api.TableDescriptor; + +/** DynamoDb connector options. Made public for {@link TableDescriptor} to access it. */ +@PublicEvolving +public class DynamoDbConnectorOptions { + + private DynamoDbConnectorOptions() { + // private constructor to prevent initialization of static class + } Review Comment: nit: Constructors should go below fields ########## flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest; +import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +/** + * Implementation of an {@link ElementConverter} for the DynamoDb Table sink. The element converter + * maps the Flink internal type of {@link RowData} to a {@link DynamoDbWriteRequest} to be used by + * the DynamoDb sink. + */ +@Internal +public class RowDataElementConverter implements ElementConverter<RowData, DynamoDbWriteRequest> { + + private final DataType physicalDataType; + private final RowDataToAttributeValueConverter rowDataToAttributeValueConverter; + + public RowDataElementConverter(DataType physicalDataType) { + this.physicalDataType = physicalDataType; + this.rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(physicalDataType); + } Review Comment: For `DynamoDbBeanElementConverter` I create the table schema here to fail fast. Currently you do not create the schema until consuming data, what happens if table invalid? https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java#L52 ########## flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.KeyValueDataType; + +import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider; +import software.amazon.awssdk.enhanced.dynamodb.EnhancedType; +import software.amazon.awssdk.enhanced.dynamodb.TableSchema; +import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.data.RowData.createFieldGetter; + +/** Converts from Flink Table API internal type of {@link RowData} to {@link AttributeValue}. */ +@Internal +public class RowDataToAttributeValueConverter implements Serializable { + private static final long serialVersionUID = 1L; + + private final DataType physicalDataType; + private transient TableSchema<RowData> tableSchema; + + public RowDataToAttributeValueConverter(DataType physicalDataType) { + this.physicalDataType = physicalDataType; + this.tableSchema = createTableSchema(); + } + + public Map<String, AttributeValue> convertRowData(RowData row) { + if (tableSchema == null) { + tableSchema = createTableSchema(); + } + return tableSchema.itemToMap(row, false); + } + + private StaticTableSchema<RowData> createTableSchema() { Review Comment: This is neat! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
