fapaul commented on a change in pull request #17897:
URL: https://github.com/apache/flink/pull/17897#discussion_r757582207



##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
##########
@@ -53,12 +52,14 @@ class BatchPhysicalLegacySinkRule extends ConverterRule(
             val dynamicPartIndices =
               
dynamicPartFields.map(partitionSink.getTableSchema.getFieldNames.indexOf(_))
 
+            // TODO This option is hardcoded to remove the dependency of 
planner from

Review comment:
       Do we already have a follow-up ticket to introduce a separate option?

##########
File path: flink-connectors/flink-connector-elasticsearch7/pom.xml
##########
@@ -51,6 +51,12 @@ under the License.
                        <scope>provided</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-base</artifactId>
+                       <version>${project.version}</version>
+               </dependency>

Review comment:
       Why did you add this dependency?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -691,14 +689,27 @@ private static ValidationException 
enrichNoMatchingConnectorError(
     }
 
     private static List<Factory> discoverFactories(ClassLoader classLoader) {
-        try {
-            final List<Factory> result = new LinkedList<>();
-            ServiceLoader.load(Factory.class, 
classLoader).iterator().forEachRemaining(result::add);
-            return result;
-        } catch (ServiceConfigurationError e) {
-            LOG.error("Could not load service provider for factories.", e);
-            throw new TableException("Could not load service provider for 
factories.", e);
-        }
+        final List<Factory> result = new LinkedList<>();
+        ServiceLoaderUtil.load(Factory.class, classLoader)
+                .forEachRemaining(
+                        loadResult -> {
+                            if (loadResult.failed()) {
+                                if (loadResult.getError() instanceof 
NoClassDefFoundError) {

Review comment:
       Can we be more precise about which error we want to catch? So far we 
only want to catch the exception for the `flink-connector-files` interfaces but 
I do not think we should continue in general for `NoClassDefFoundError`.

##########
File path: 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -343,7 +343,10 @@ private RowDataPartitionComputer partitionComputer() {
             @Override
             public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
                     DataType producedDataType) {
-                throw new TableException("Compaction reader not support 
DataStructure converter.");
+                // This method cannot be implemented without changing the
+                // DynamicTableSink.DataStructureConverter interface
+                throw new UnsupportedOperationException(

Review comment:
       Any idea why this exception was a `TableException` before?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+class ServiceLoaderUtil {
+
+    static <T> Iterator<LoadResult<T>> load(Class<T> clazz, ClassLoader 
classLoader) {
+        return new SafeIterator<>(ServiceLoader.load(clazz, 
classLoader).iterator());
+    }
+
+    static class LoadResult<T> {
+        private final T service;
+        private final Throwable error;
+
+        private LoadResult(T service, Throwable error) {
+            this.service = service;
+            this.error = error;
+        }
+
+        private LoadResult(T service) {
+            this(service, null);
+        }
+
+        private LoadResult(Throwable error) {
+            this(null, error);
+        }
+
+        public boolean failed() {
+            return error != null;
+        }
+
+        public Throwable getError() {
+            return error;
+        }
+
+        public T getService() {
+            return service;
+        }
+    }
+
+    /**
+     * This iterator wraps {@link Iterator#hasNext()} and {@link 
Iterator#next()} in try-catch, and
+     * returns {@link LoadResult} to handle such failures.
+     */
+    private static class SafeIterator<T> implements Iterator<LoadResult<T>> {
+
+        private final Iterator<T> iterator;
+
+        public SafeIterator(Iterator<T> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            try {
+                return iterator.hasNext();
+            } catch (Throwable t) {
+                return true; // It has the exception!
+            }
+        }
+
+        @Override
+        public LoadResult<T> next() {
+            try {
+                if (iterator.hasNext()) {
+                    return new LoadResult<>(iterator.next());
+                }
+                return null;

Review comment:
       This is very unusual and somewhat contradicts the `Iterator` interface. 
If the iterator does not have a next element it should throw a 
`NoSuchElementException`.

##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
##########
@@ -113,4 +113,13 @@ public static int computeMinBytesForDecimalPrecision(int 
precision) {
         }
         return numBytes;
     }
+
+    // From DecimalDataUtils
+    public static boolean is32BitDecimal(int precision) {

Review comment:
       Not sure I like this change because we have now duplicated the code and 
the methods in `DecimalDataUtils` appear unused. WDYT about moving 
`DecimalDataUtils` to table-commons because I can image other formats also want 
to use it?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ServiceLoaderUtil.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+class ServiceLoaderUtil {
+
+    static <T> Iterator<LoadResult<T>> load(Class<T> clazz, ClassLoader 
classLoader) {
+        return new SafeIterator<>(ServiceLoader.load(clazz, 
classLoader).iterator());
+    }
+
+    static class LoadResult<T> {
+        private final T service;
+        private final Throwable error;
+
+        private LoadResult(T service, Throwable error) {
+            this.service = service;
+            this.error = error;
+        }
+
+        private LoadResult(T service) {
+            this(service, null);
+        }
+
+        private LoadResult(Throwable error) {
+            this(null, error);
+        }
+
+        public boolean failed() {
+            return error != null;
+        }
+
+        public Throwable getError() {
+            return error;
+        }
+
+        public T getService() {
+            return service;
+        }
+    }
+
+    /**
+     * This iterator wraps {@link Iterator#hasNext()} and {@link 
Iterator#next()} in try-catch, and
+     * returns {@link LoadResult} to handle such failures.
+     */
+    private static class SafeIterator<T> implements Iterator<LoadResult<T>> {
+
+        private final Iterator<T> iterator;
+
+        public SafeIterator(Iterator<T> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            try {
+                return iterator.hasNext();
+            } catch (Throwable t) {

Review comment:
       What kind of `Throwable` do we expect here?

##########
File path: flink-connectors/flink-connector-files/pom.xml
##########
@@ -58,6 +58,22 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+                       <optional>true</optional>

Review comment:
       I do not think this dependency is ever optional.

##########
File path: flink-table/flink-table-uber/pom.xml
##########
@@ -88,6 +88,11 @@ under the License.
                        <artifactId>flink-cep</artifactId>
                        <version>${project.version}</version>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>

Review comment:
       Why do we again include the file connector in the uber-jar?

##########
File path: 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvDeserializationSchema.java
##########
@@ -16,53 +16,65 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.filesystem;
+package org.apache.flink.formats.testcsv;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.conversion.DataStructureConverter;
-import org.apache.flink.table.data.conversion.DataStructureConverters;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.function.Function;
 
 /**
  * The {@link DeserializationSchema} that output {@link RowData}.
  *
  * <p>NOTE: This is meant only for testing purpose and doesn't provide a 
feature complete stable csv
  * parser! If you need a feature complete CSV parser, check out the flink-csv 
package.
  */
-public class TestCsvDeserializationSchema implements 
DeserializationSchema<RowData> {
+class TestCsvDeserializationSchema implements DeserializationSchema<RowData> {
 
     private final List<DataType> physicalFieldTypes;
     private final int physicalFieldCount;
 
     private final TypeInformation<RowData> typeInfo;
     private final int[] indexMapping;
 
-    @SuppressWarnings("rawtypes")
-    private transient DataStructureConverter[] csvRowToRowDataConverters;
+    private final DynamicTableSource.DataStructureConverter[] 
csvRowToRowDataConverters;
 
     private transient FieldParser<?>[] fieldParsers;
 
-    public TestCsvDeserializationSchema(DataType physicalDataType, 
List<String> orderedCsvColumns) {
+    public TestCsvDeserializationSchema(
+            DataType physicalDataType,
+            TypeInformation<RowData> typeInfo,
+            List<String> orderedCsvColumns,
+            Function<DataType, DynamicTableSource.DataStructureConverter> 
converterFactory) {
         this.physicalFieldTypes = DataType.getFieldDataTypes(physicalDataType);
         this.physicalFieldCount = physicalFieldTypes.size();
-        this.typeInfo = InternalTypeInfo.of((RowType) 
physicalDataType.getLogicalType());
+        this.typeInfo = typeInfo;
 
         List<String> physicalFieldNames = 
DataType.getFieldNames(physicalDataType);
         this.indexMapping =
                 
orderedCsvColumns.stream().mapToInt(physicalFieldNames::indexOf).toArray();
 
+        // Init data converters
+        int csvRowLength = indexMapping.length;
+        this.csvRowToRowDataConverters =
+                new DynamicTableSource.DataStructureConverter[csvRowLength];
+        for (int csvColumn = 0; csvColumn < csvRowLength; csvColumn++) {
+            if (indexMapping[csvColumn] != -1) {
+                DataType fieldType = 
physicalFieldTypes.get(indexMapping[csvColumn]);
+                this.csvRowToRowDataConverters[csvColumn] = 
converterFactory.apply(fieldType);
+            }
+        }

Review comment:
       @matriv @twalthr Can someone with more table knowledge also take a look 
at these changes?  They look right and the tests are passing but I want to be 
sure.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactContext.java
##########
@@ -7,17 +7,16 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.

Review comment:
       Hotfix commit?

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java
##########
@@ -68,7 +64,8 @@
  * the file system.
  */
 public class CompactOperator<T> extends 
AbstractStreamOperator<PartitionCommitInfo>

Review comment:
       Is this class supposed to be used by users? If not we should mark it as 
`@Internal`

##########
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalLegacySinkRule.scala
##########
@@ -53,12 +52,14 @@ class BatchPhysicalLegacySinkRule extends ConverterRule(
             val dynamicPartIndices =
               
dynamicPartFields.map(partitionSink.getTableSchema.getFieldNames.indexOf(_))
 
+            // TODO This option is hardcoded to remove the dependency of 
planner from

Review comment:
       @JingsongLi do you know if this option is still actively used?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -691,14 +689,27 @@ private static ValidationException 
enrichNoMatchingConnectorError(
     }
 
     private static List<Factory> discoverFactories(ClassLoader classLoader) {
-        try {
-            final List<Factory> result = new LinkedList<>();
-            ServiceLoader.load(Factory.class, 
classLoader).iterator().forEachRemaining(result::add);
-            return result;
-        } catch (ServiceConfigurationError e) {
-            LOG.error("Could not load service provider for factories.", e);
-            throw new TableException("Could not load service provider for 
factories.", e);
-        }
+        final List<Factory> result = new LinkedList<>();
+        ServiceLoaderUtil.load(Factory.class, classLoader)
+                .forEachRemaining(
+                        loadResult -> {
+                            if (loadResult.failed()) {
+                                if (loadResult.getError() instanceof 
NoClassDefFoundError) {
+                                    LOG.debug(

Review comment:
       We should definitely write a test to verify that the discovery does not 
fail if the base interface is not found

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/BinPacking.java
##########
@@ -7,17 +7,16 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */

Review comment:
       Nit: I guess this can be a hotfix commit ;) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to