dawidwys commented on a change in pull request #9382: [FLINK-13600][table]
Rework TableEnvironment.connect() class hierarchy
URL: https://github.com/apache/flink/pull/9382#discussion_r311917736
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/TableDescriptor.java
##########
@@ -19,11 +19,132 @@
package org.apache.flink.table.descriptors;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
+import static
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
+import static
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_RETRACT;
+import static
org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_UPSERT;
/**
- * Common class for all descriptors describing table sources and sinks.
- */
+ * Describes a table consisting of a connector (in a given update mode) and a
format.
+ */
@PublicEvolving
-public abstract class TableDescriptor extends DescriptorBase {
+public abstract class TableDescriptor<D extends TableDescriptor<D>> extends
DescriptorBase {
+
+ private final ConnectorDescriptor connectorDescriptor;
+
+ private @Nullable FormatDescriptor formatDescriptor;
+
+ private @Nullable String updateMode;
+
+ protected TableDescriptor(ConnectorDescriptor connectorDescriptor) {
+ this.connectorDescriptor =
Preconditions.checkNotNull(connectorDescriptor, "Connector must not be null.");
+ }
+
+ /**
+ * Specifies the format that defines how to read data from a connector.
+ */
+ @SuppressWarnings("unchecked")
+ public D withFormat(FormatDescriptor format) {
+ formatDescriptor = Preconditions.checkNotNull(format, "Format
must not be null.");
+ return (D) this;
+ }
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and
an external connector.
+ *
+ * <p>In append mode, a dynamic table and an external connector only
exchange INSERT messages.
+ *
+ * @see #inRetractMode()
+ * @see #inUpsertMode()
+ */
+ @SuppressWarnings("unchecked")
+ public D inAppendMode() {
+ updateMode = UPDATE_MODE_VALUE_APPEND;
+ return (D) this;
+ }
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and
an external connector.
+ *
+ * <p>In retract mode, a dynamic table and an external connector
exchange ADD and RETRACT messages.
+ *
+ * <p>An INSERT change is encoded as an ADD message, a DELETE change as
a RETRACT message, and an
+ * UPDATE change as a RETRACT message for the updated (previous) row
and an ADD message for
+ * the updating (new) row.
+ *
+ * <p>In this mode, a key must not be defined as opposed to upsert
mode. However, every update
+ * consists of two messages which is less efficient.
+ *
+ * @see #inAppendMode()
+ * @see #inUpsertMode()
+ */
+ @SuppressWarnings("unchecked")
+ public D inRetractMode() {
+ updateMode = UPDATE_MODE_VALUE_RETRACT;
+ return (D) this;
+ }
+
+ /**
+ * Declares how to perform the conversion between a dynamic table and
an external connector.
+ *
+ * <p>In upsert mode, a dynamic table and an external connector
exchange UPSERT and DELETE messages.
+ *
+ * <p>This mode requires a (possibly composite) unique key by which
updates can be propagated. The
+ * external connector needs to be aware of the unique key attribute in
order to apply messages
+ * correctly. INSERT and UPDATE changes are encoded as UPSERT messages.
DELETE changes as
+ * DELETE messages.
+ *
+ * <p>The main difference to a retract stream is that UPDATE changes
are encoded with a single
+ * message and are therefore more efficient.
+ *
+ * @see #inAppendMode()
+ * @see #inRetractMode()
+ */
+ @SuppressWarnings("unchecked")
+ public D inUpsertMode() {
+ updateMode = UPDATE_MODE_VALUE_UPSERT;
+ return (D) this;
+ }
+
+ /**
+ * Converts this descriptor into a set of properties.
+ */
+ @Override
+ public Map<String, String> toProperties() {
Review comment:
The same problem as for `ConnectTableDescriptor`. Adds implicit contract.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services