Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5564#discussion_r170562304
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * The validator for {@link Kafka}.
+ */
+public class KafkaValidator extends ConnectorDescriptorValidator {
+
+ public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
+ public static final String CONNECTOR_VERSION_VALUE_08 = "0.8";
+ public static final String CONNECTOR_VERSION_VALUE_09 = "0.9";
+ public static final String CONNECTOR_VERSION_VALUE_010 = "0.10";
+ public static final String CONNECTOR_VERSION_VALUE_011 = "0.11";
+ public static final String CONNECTOR_TOPIC = "connector.topic";
+ public static final String CONNECTOR_STARTUP_MODE =
"connector.startup-mode";
+ public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST =
"earliest-offset";
+ public static final String CONNECTOR_STARTUP_MODE_VALUE_LATEST =
"latest-offset";
+ public static final String CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS =
"group-offsets";
+ public static final String
CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
+ public static final String CONNECTOR_SPECIFIC_OFFSETS =
"connector.specific-offsets";
+ public static final String CONNECTOR_SPECIFIC_OFFSETS_PARTITION =
"partition";
+ public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset";
+ public static final String CONNECTOR_PROPERTIES =
"connector.properties";
--- End diff --
Yes, these are very Kafka specific properties. That's where they are also
pushed down by the table source builder.
---