wuchong commented on a change in pull request #12140:
URL: https://github.com/apache/flink/pull/12140#discussion_r424859613



##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Table format factory for providing configured instances of JSON to RowData
+ * {@link SerializationSchema} and {@link DeserializationSchema}.
+ */
+public class JsonFormatFactory implements
+               DeserializationFormatFactory,
+               SerializationFormatFactory {
+
+       public static final String IDENTIFIER = "json";
+
+       // 
------------------------------------------------------------------------
+       //  Options
+       // 
------------------------------------------------------------------------
+
+       private static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = 
ConfigOptions
+                       .key("fail-on-missing-field")
+                       .booleanType()
+                       .defaultValue(false)
+                       .withDescription("Optional flag to specify whether to 
fail if a field is missing or not, false by default");
+
+       private static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = 
ConfigOptions
+                       .key("ignore-parse-errors")
+                       .booleanType()
+                       .defaultValue(false)
+                       .withDescription("Optional flag to skip fields and rows 
with parse errors instead of failing;\n"
+                                       + "fields are set to null in case of 
errors");

Review comment:
       ```suggestion
                        .withDescription("Optional flag to skip fields and rows 
with parse errors instead of failing, "
                                        + "fields are set to null in case of 
errors. Default is false.");
   ```

##########
File path: 
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link JsonFormatFactory}.
+ */
+public class JsonFormatFactoryTest extends TestLogger {
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       private static final TableSchema SCHEMA = TableSchema.builder()
+                       .field("field1", DataTypes.BOOLEAN())
+                       .field("field2", DataTypes.INT())
+                       .build();
+
+       private static final RowType ROW_TYPE = (RowType) 
SCHEMA.toRowDataType().getLogicalType();
+
+       @Test
+       public void testSeDeSchema() {
+               final Map<String, String> tableOptions = getAllOptions();
+
+               testSchemaSerializationSchema(tableOptions);
+
+               testSchemaDeserializationSchema(tableOptions);
+       }
+
+       @Test
+       public void testFailOnMissingField() {
+               final Map<String, String> tableOptions = getModifyOptions(
+                               options -> 
options.put("json.fail-on-missing-field", "true"));
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new 
ValidationException("fail-on-missing-field and ignore-parse-errors shouldn't 
both be true.")));
+               testSchemaDeserializationSchema(tableOptions);
+       }
+
+       @Test
+       public void testInvalidOptionForIgnoreParseErrors() {
+               final Map<String, String> tableOptions = getModifyOptions(
+                               options -> 
options.put("json.ignore-parse-errors", "abc"));
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new 
IllegalArgumentException("Unrecognized option for boolean: abc. Expected either 
true or false(case insensitive)")));
+               testSchemaDeserializationSchema(tableOptions);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private void testSchemaDeserializationSchema(Map<String, String> 
options) {
+               final JsonRowDataDeserializationSchema expectedDeser =
+                               new JsonRowDataDeserializationSchema(
+                                               ROW_TYPE,
+                                               new RowDataTypeInfo(ROW_TYPE),
+                                               false,
+                                               true);
+
+               final DynamicTableSource actualSource = 
createTableSource(options);
+               assert actualSource instanceof 
TestDynamicTableFactory.DynamicTableSourceMock;
+               TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+                               
(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+               DeserializationSchema<RowData> actualDeser = 
scanSourceMock.sourceValueFormat
+                               .createScanFormat(
+                                               
ScanRuntimeProviderContext.INSTANCE,
+                                               SCHEMA.toRowDataType());
+
+               assertEquals(expectedDeser, actualDeser);
+       }
+
+       private void testSchemaSerializationSchema(Map<String, String> options) 
{
+               final DynamicTableSink actualSink = createTableSink(options);
+               assert actualSink instanceof 
TestDynamicTableFactory.DynamicTableSinkMock;
+               TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+                               (TestDynamicTableFactory.DynamicTableSinkMock) 
actualSink;
+
+               SerializationSchema<RowData> actualSer = 
sinkMock.sinkValueFormat
+                               .createSinkFormat(
+                                               new 
SinkRuntimeProviderContext(false),
+                                               SCHEMA.toRowDataType());
+
+               assertThat(actualSer, 
instanceOf(JsonRowDataSerializationSchema.class));

Review comment:
       Could you use the same validation as above? I think we will introduce 
some new options for json seriliazation soon. 

##########
File path: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Table format factory for providing configured instances of JSON to RowData
+ * {@link SerializationSchema} and {@link DeserializationSchema}.
+ */
+public class JsonFormatFactory implements
+               DeserializationFormatFactory,
+               SerializationFormatFactory {
+
+       public static final String IDENTIFIER = "json";
+
+       // 
------------------------------------------------------------------------
+       //  Options
+       // 
------------------------------------------------------------------------
+
+       private static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = 
ConfigOptions
+                       .key("fail-on-missing-field")
+                       .booleanType()
+                       .defaultValue(false)
+                       .withDescription("Optional flag to specify whether to 
fail if a field is missing or not, false by default");
+
+       private static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = 
ConfigOptions
+                       .key("ignore-parse-errors")
+                       .booleanType()
+                       .defaultValue(false)
+                       .withDescription("Optional flag to skip fields and rows 
with parse errors instead of failing;\n"
+                                       + "fields are set to null in case of 
errors");
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
+                       DynamicTableFactory.Context context,
+                       ReadableConfig formatOptions) {
+               FactoryUtil.validateFactoryOptions(this, formatOptions);
+               validateFormatOptions(formatOptions);
+
+               final boolean failOnMissingField = 
formatOptions.get(FAIL_ON_MISSING_FIELD);
+               final boolean ignoreParseErrors = 
formatOptions.get(IGNORE_PARSE_ERRORS);
+
+               return new ScanFormat<DeserializationSchema<RowData>>() {
+                       @Override
+                       public DeserializationSchema<RowData> createScanFormat(
+                                       ScanTableSource.Context scanContext,
+                                       DataType producedDataType) {
+                               final RowType rowType = (RowType) 
producedDataType.getLogicalType();
+                               final TypeInformation<RowData> rowDataTypeInfo =
+                                               (TypeInformation<RowData>) 
scanContext.createTypeInformation(producedDataType);
+                               return new JsonRowDataDeserializationSchema(
+                                               rowType,
+                                               rowDataTypeInfo,
+                                               failOnMissingField,
+                                               ignoreParseErrors);
+                       }
+
+                       @Override
+                       public ChangelogMode getChangelogMode() {
+                               return ChangelogMode.insertOnly();
+                       }
+               };
+       }
+
+       @Override
+       public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
+                       DynamicTableFactory.Context context,
+                       ReadableConfig formatOptions) {
+               FactoryUtil.validateFactoryOptions(this, formatOptions);
+
+               return new SinkFormat<SerializationSchema<RowData>>() {
+                       @Override
+                       public SerializationSchema<RowData> createSinkFormat(
+                                       DynamicTableSink.Context context,
+                                       DataType consumedDataType) {
+                               final RowType rowType = (RowType) 
consumedDataType.getLogicalType();
+                               return new 
JsonRowDataSerializationSchema(rowType);
+                       }
+
+                       @Override
+                       public ChangelogMode getChangelogMode() {
+                               return ChangelogMode.insertOnly();
+                       }
+               };
+       }
+
+       @Override
+       public String factoryIdentifier() {
+               return IDENTIFIER;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> requiredOptions() {
+               return Collections.emptySet();
+       }
+
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(FAIL_ON_MISSING_FIELD);
+               options.add(IGNORE_PARSE_ERRORS);
+               return options;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Validation
+       // 
------------------------------------------------------------------------
+
+       private void validateFormatOptions(ReadableConfig tableOptions) {
+               boolean failOnMissingField = 
tableOptions.getOptional(FAIL_ON_MISSING_FIELD)
+                               .orElse(false);
+               boolean ignoreParseErrors = 
tableOptions.getOptional(IGNORE_PARSE_ERRORS)
+                               .orElse(false);

Review comment:
       Can be simplified to :
   
   ```java
   boolean failOnMissingField = tableOptions.get(FAIL_ON_MISSING_FIELD);
   boolean ignoreParseErrors = tableOptions.get(IGNORE_PARSE_ERRORS);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to