This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c4e74b57c80fb5f2d22756535eeac60e428f1645 Author: yanghua <[email protected]> AuthorDate: Sat Oct 13 11:48:51 2018 +0800 [FLINK-9697] Rename KafkaTableSink to KafkaTableSinkBase --- .../connectors/kafka/Kafka010JsonTableSink.java | 6 +++--- .../streaming/connectors/kafka/Kafka010TableSink.java | 2 +- .../connectors/kafka/Kafka010JsonTableSinkTest.java | 4 ++-- .../streaming/connectors/kafka/Kafka011TableSink.java | 2 +- .../streaming/connectors/kafka/Kafka08JsonTableSink.java | 8 ++++---- .../streaming/connectors/kafka/Kafka08TableSink.java | 2 +- .../connectors/kafka/Kafka08JsonTableSinkTest.java | 4 ++-- .../streaming/connectors/kafka/Kafka09JsonTableSink.java | 8 ++++---- .../streaming/connectors/kafka/Kafka09TableSink.java | 2 +- .../connectors/kafka/Kafka09JsonTableSinkTest.java | 4 ++-- .../streaming/connectors/kafka/KafkaJsonTableSink.java | 4 ++-- .../{KafkaTableSink.java => KafkaTableSinkBase.java} | 16 ++++++++-------- ...SinkTestBase.java => KafkaTableSinkBaseTestBase.java} | 14 +++++++------- 13 files changed, 38 insertions(+), 38 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java index 8471908..b9a5350 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java @@ -28,7 +28,7 @@ import java.util.Optional; import java.util.Properties; /** - * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format. + * Kafka 0.10 {@link KafkaTableSinkBase} that serializes data in JSON format. * * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for @@ -39,7 +39,7 @@ import java.util.Properties; public class Kafka010JsonTableSink extends KafkaJsonTableSink { /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.10 * topic with fixed partition assignment. * * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p> @@ -60,7 +60,7 @@ public class Kafka010JsonTableSink extends KafkaJsonTableSink { } /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.10 * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java index 1d408b8..79aad7c 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java @@ -31,7 +31,7 @@ import java.util.Properties; * Kafka 0.10 table sink for writing data into Kafka. */ @Internal -public class Kafka010TableSink extends KafkaTableSink { +public class Kafka010TableSink extends KafkaTableSinkBase { public Kafka010TableSink( TableSchema schema, diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java index 9208f65..4575f87 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java @@ -32,10 +32,10 @@ import java.util.Properties; * drop support for format-specific table sinks. */ @Deprecated -public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase { +public class Kafka010JsonTableSinkTest extends KafkaTableSinkBaseTestBase { @Override - protected KafkaTableSink createTableSink( + protected KafkaTableSinkBase createTableSink( String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java index 8d81a5b..304b26b 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java @@ -33,7 +33,7 @@ import java.util.Properties; * Kafka 0.11 table sink for writing data into Kafka. */ @Internal -public class Kafka011TableSink extends KafkaTableSink { +public class Kafka011TableSink extends KafkaTableSinkBase { public Kafka011TableSink( TableSchema schema, diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index 189a9fd..b7474e2 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -30,7 +30,7 @@ import java.util.Optional; import java.util.Properties; /** - * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format. + * Kafka 0.8 {@link KafkaTableSinkBase} that serializes data in JSON format. * * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for @@ -41,7 +41,7 @@ import java.util.Properties; public class Kafka08JsonTableSink extends KafkaJsonTableSink { /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8 * topic with fixed partition assignment. * * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p> @@ -62,7 +62,7 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { } /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8 * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written @@ -76,7 +76,7 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { } /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8 * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java index 146cfc9..90c4258 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java @@ -31,7 +31,7 @@ import java.util.Properties; * Kafka 0.8 table sink for writing data into Kafka. */ @Internal -public class Kafka08TableSink extends KafkaTableSink { +public class Kafka08TableSink extends KafkaTableSinkBase { public Kafka08TableSink( TableSchema schema, diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index fc46ad4..aa5fa16 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -32,10 +32,10 @@ import java.util.Properties; * drop support for format-specific table sinks. */ @Deprecated -public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { +public class Kafka08JsonTableSinkTest extends KafkaTableSinkBaseTestBase { @Override - protected KafkaTableSink createTableSink( + protected KafkaTableSinkBase createTableSink( String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index 3363459..cd27f83 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -30,7 +30,7 @@ import java.util.Optional; import java.util.Properties; /** - * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format. + * Kafka 0.9 {@link KafkaTableSinkBase} that serializes data in JSON format. * * @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together * with descriptors for schema and format instead. Descriptors allow for @@ -41,7 +41,7 @@ import java.util.Properties; public class Kafka09JsonTableSink extends KafkaJsonTableSink { /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9 * topic with fixed partition assignment. * * <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p> @@ -62,7 +62,7 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink { } /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9 * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written @@ -76,7 +76,7 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink { } /** - * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9 + * Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9 * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java index 6e38aad..657d75a 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java @@ -31,7 +31,7 @@ import java.util.Properties; * Kafka 0.9 table sink for writing data into Kafka. */ @Internal -public class Kafka09TableSink extends KafkaTableSink { +public class Kafka09TableSink extends KafkaTableSinkBase { public Kafka09TableSink( TableSchema schema, diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index 97b5c7d..29cfa93 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -32,10 +32,10 @@ import java.util.Properties; * drop support for format-specific table sinks. */ @Deprecated -public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { +public class Kafka09JsonTableSinkTest extends KafkaTableSinkBaseTestBase { @Override - protected KafkaTableSink createTableSink( + protected KafkaTableSinkBase createTableSink( String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java index 231eddd..d84eb89 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -28,13 +28,13 @@ import org.apache.flink.types.Row; import java.util.Properties; /** - * Base class for {@link KafkaTableSink} that serializes data in JSON format. + * Base class for {@link KafkaTableSinkBase} that serializes data in JSON format. * * @deprecated Use table descriptors instead of implementation-specific classes. */ @Deprecated @Internal -public abstract class KafkaJsonTableSink extends KafkaTableSink { +public abstract class KafkaJsonTableSink extends KafkaTableSinkBase { /** * Creates KafkaJsonTableSink. diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java similarity index 94% rename from flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java rename to flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java index a85d536..acd10cc 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java @@ -43,7 +43,7 @@ import java.util.Properties; * override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional)}}. */ @Internal -public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { +public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row> { // TODO make all attributes final and mandatory once we drop support for format-specific table sinks @@ -66,7 +66,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { protected String[] fieldNames; protected TypeInformation[] fieldTypes; - protected KafkaTableSink( + protected KafkaTableSinkBase( TableSchema schema, String topic, Properties properties, @@ -81,7 +81,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { } /** - * Creates KafkaTableSink. + * Creates KafkaTableSinkBase. * * @param topic Kafka topic to write to. * @param properties Properties for the Kafka producer. @@ -89,7 +89,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { * @deprecated Use table descriptors instead of implementation-specific classes. */ @Deprecated - public KafkaTableSink( + public KafkaTableSinkBase( String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { @@ -133,7 +133,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { * @return Deep copy of this sink */ @Deprecated - protected KafkaTableSink createCopy() { + protected KafkaTableSinkBase createCopy() { throw new UnsupportedOperationException("This method only exists for backwards compatibility."); } @@ -164,14 +164,14 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { } @Override - public KafkaTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + public KafkaTableSinkBase configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { if (schema.isPresent()) { // a fixed schema is defined so reconfiguration is not supported throw new UnsupportedOperationException("Reconfiguration of this sink is not supported."); } // legacy code - KafkaTableSink copy = createCopy(); + KafkaTableSinkBase copy = createCopy(); copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); Preconditions.checkArgument(fieldNames.length == fieldTypes.length, @@ -191,7 +191,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { if (o == null || getClass() != o.getClass()) { return false; } - KafkaTableSink that = (KafkaTableSink) o; + KafkaTableSinkBase that = (KafkaTableSinkBase) o; return Objects.equals(schema, that.schema) && Objects.equals(topic, that.topic) && Objects.equals(properties, that.properties) && diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBaseTestBase.java similarity index 90% rename from flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java rename to flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBaseTestBase.java index b4bb89d..dbbb10f 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBaseTestBase.java @@ -50,7 +50,7 @@ import static org.mockito.Mockito.when; * drop support for format-specific table sinks. */ @Deprecated -public abstract class KafkaTableSinkTestBase { +public abstract class KafkaTableSinkBaseTestBase { private static final String TOPIC = "testTopic"; private static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; @@ -64,7 +64,7 @@ public abstract class KafkaTableSinkTestBase { DataStream dataStream = mock(DataStream.class); when(dataStream.addSink(any(SinkFunction.class))).thenReturn(mock(DataStreamSink.class)); - KafkaTableSink kafkaTableSink = spy(createTableSink()); + KafkaTableSinkBase kafkaTableSink = spy(createTableSink()); kafkaTableSink.emitDataStream(dataStream); // verify correct producer class @@ -80,8 +80,8 @@ public abstract class KafkaTableSinkTestBase { @Test public void testConfiguration() { - KafkaTableSink kafkaTableSink = createTableSink(); - KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES); + KafkaTableSinkBase kafkaTableSink = createTableSink(); + KafkaTableSinkBase newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES); assertNotSame(kafkaTableSink, newKafkaTableSink); assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames()); @@ -89,7 +89,7 @@ public abstract class KafkaTableSinkTestBase { assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType()); } - protected abstract KafkaTableSink createTableSink( + protected abstract KafkaTableSinkBase createTableSink( String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner); @@ -98,8 +98,8 @@ public abstract class KafkaTableSinkTestBase { protected abstract Class<? extends FlinkKafkaProducerBase> getProducerClass(); - private KafkaTableSink createTableSink() { - KafkaTableSink sink = createTableSink(TOPIC, PROPERTIES, PARTITIONER); + private KafkaTableSinkBase createTableSink() { + KafkaTableSinkBase sink = createTableSink(TOPIC, PROPERTIES, PARTITIONER); return sink.configure(FIELD_NAMES, FIELD_TYPES); }
