This is an automated email from the ASF dual-hosted git repository. lamberliu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push: new ecb766e Revert "[TUBEMQ-197] Support TubeMQ connector for Apache Flink (#157)" ecb766e is described below commit ecb766e4fc6fa94338f8b930060ec4cd03e5dfc7 Author: lamberliu <55134155+lamber...@users.noreply.github.com> AuthorDate: Thu Jun 18 10:47:54 2020 +0800 Revert "[TUBEMQ-197] Support TubeMQ connector for Apache Flink (#157)" This reverts commit a770cdab0c4a7faf4c831c5f6397bcf923c82725. --- .../flink/connectors/tubemq/TubemqOptions.java | 11 -- .../connectors/tubemq/TubemqSinkFunction.java | 181 --------------------- .../flink/connectors/tubemq/TubemqTableSink.java | 139 ---------------- ...kFactory.java => TubemqTableSourceFactory.java} | 55 +------ 4 files changed, 2 insertions(+), 384 deletions(-) diff --git a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java index d37dbe0..8be20d3 100644 --- a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java +++ b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java @@ -31,17 +31,6 @@ public class TubemqOptions { .noDefaultValue() .withDescription("The session key for this consumer group at startup."); - public static final ConfigOption<String> TID = - ConfigOptions.key("topic.tid") - .noDefaultValue() - .withDescription("The tid owned this topic."); - - public static final ConfigOption<Integer> MAX_RETRIES = - ConfigOptions.key("max.retries") - .defaultValue(5) - .withDescription("The maximum number of retries when an " + - "exception is caught."); - public static final ConfigOption<Boolean> BOOTSTRAP_FROM_MAX = ConfigOptions.key("bootstrap.from.max") .defaultValue(false) diff --git a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java deleted file mode 100644 index 425e16b..0000000 --- a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.connectors.tubemq; - -import static org.apache.flink.connectors.tubemq.TubemqOptions.MAX_RETRIES; - -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashSet; - -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.Preconditions; -import org.apache.tubemq.client.config.TubeClientConfig; -import org.apache.tubemq.client.factory.MessageSessionFactory; -import org.apache.tubemq.client.factory.TubeSingleSessionFactory; -import org.apache.tubemq.client.producer.MessageProducer; -import org.apache.tubemq.client.producer.MessageSentResult; -import org.apache.tubemq.corebase.Message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TubemqSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction { - - private static final Logger LOG = LoggerFactory.getLogger(TubemqSinkFunction.class); - - private static final String SYSTEM_HEADER_TIME_FORMAT = "yyyyMMddHHmm"; - - /** - * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081. - */ - private final String masterAddress; - - /** - * The topic name. - */ - private final String topic; - - /** - * The tid of this topic - */ - private final String tid; - /** - * The serializer for the records sent to pulsar. - */ - private final SerializationSchema<T> serializationSchema; - - /** - * The tubemq producer. - */ - private transient MessageProducer producer; - - /** - * The tubemq session factory. - */ - private transient MessageSessionFactory sessionFactory; - - /** - * The maximum number of retries. - */ - private final int maxRetries; - - public TubemqSinkFunction(String topic, - String masterAddress, - SerializationSchema<T> serializationSchema, - Configuration configuration) { - Preconditions.checkNotNull(topic, - "The topic must not be null."); - Preconditions.checkNotNull(masterAddress, - "The master address must not be null."); - Preconditions.checkNotNull(serializationSchema, - "The serialization schema must not be null."); - Preconditions.checkNotNull(configuration, - "The configuration must not be null."); - - this.topic = topic; - this.masterAddress = masterAddress; - this.serializationSchema = serializationSchema; - this.tid = configuration.getString(TubemqOptions.TID); - this.maxRetries = configuration.getInteger(MAX_RETRIES); - } - - @Override - public void snapshotState(FunctionSnapshotContext functionSnapshotContext) { - // Nothing to do. - } - - @Override - public void initializeState(FunctionInitializationContext functionInitializationContext) { - // Nothing to do. - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - TubeClientConfig tubeClientConfig = new TubeClientConfig(masterAddress); - this.sessionFactory = new TubeSingleSessionFactory(tubeClientConfig); - this.producer = sessionFactory.createProducer(); - HashSet<String> hashSet = new HashSet<>(); - hashSet.add(topic); - producer.publish(hashSet); - } - - @Override - public void invoke(T in, Context context) throws Exception { - - int retries = 0; - Exception exception = null; - - while (maxRetries <= 0 || retries < maxRetries) { - - try { - byte[] body = serializationSchema.serialize(in); - Message message = new Message(topic, body); - if (StringUtils.isNotBlank(tid)) { - SimpleDateFormat sdf = new SimpleDateFormat(SYSTEM_HEADER_TIME_FORMAT); - long currTimeMillis = System.currentTimeMillis(); - message.putSystemHeader(tid, sdf.format(new Date(currTimeMillis))); - } - - MessageSentResult sendResult = producer.sendMessage(message); - if (sendResult.isSuccess()) { - return; - } else { - LOG.warn("Send msg fail, error code: {}, error message: {}", - sendResult.getErrCode(), sendResult.getErrMsg()); - } - } catch (Exception e) { - LOG.warn("Could not properly send the message to hippo " + - "(retries: {}).", retries, e); - - retries++; - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - } - - throw new IOException("Could not properly send the message to hippo.", exception); - } - - @Override - public void close() throws Exception { - - try { - if (producer != null) { - producer.shutdown(); - producer = null; - } - if (sessionFactory != null) { - sessionFactory.shutdown(); - sessionFactory = null; - } - } catch (Throwable e) { - LOG.error("Shutdown producer error", e); - } finally { - super.close(); - } - } -} diff --git a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java deleted file mode 100644 index cf40879..0000000 --- a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -import java.util.Arrays; - -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.sinks.AppendStreamTableSink; -import org.apache.flink.table.utils.TableConnectorUtils; -import org.apache.flink.types.Row; - -/** - * Tubemq {@link org.apache.flink.table.sinks.StreamTableSink}. - */ -public class TubemqTableSink implements AppendStreamTableSink<Row> { - - /** - * Serialization schema for records to tubemq. - */ - private final SerializationSchema<Row> serializationSchema; - - /** - * The schema of the table. - */ - private final TableSchema schema; - - /** - * The tubemq topic name. - */ - private final String topic; - - /** - * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081 . - */ - private final String masterAddress; - - /** - * The parameters collection for tubemq producer. - */ - private final Configuration configuration; - - public TubemqTableSink( - SerializationSchema<Row> serializationSchema, - TableSchema schema, - String topic, - String masterAddress, - Configuration configuration - ) { - this.serializationSchema = checkNotNull(serializationSchema, - "The deserialization schema must not be null."); - this.schema = checkNotNull(schema, - "The schema must not be null."); - this.topic = checkNotNull(topic, - "Topic must not be null."); - this.masterAddress = checkNotNull(masterAddress, - "Master address must not be null."); - this.configuration = checkNotNull(configuration, - "The configuration must not be null."); - } - - @Override - public void emitDataStream(DataStream<Row> dataStream) { - consumeDataStream(dataStream); - } - - @Override - public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) { - - final SinkFunction<Row> tubemqSinkFunction = - new TubemqSinkFunction<>( - topic, - masterAddress, - serializationSchema, - configuration - ); - - return dataStream - .addSink(tubemqSinkFunction) - .name( - TableConnectorUtils.generateRuntimeName( - getClass(), - getFieldNames() - ) - ); - } - - @Override - public TypeInformation<Row> getOutputType() { - return schema.toRowType(); - } - - @Override - public String[] getFieldNames() { - return schema.getFieldNames(); - } - - @Override - public TypeInformation<?>[] getFieldTypes() { - return schema.getFieldTypes(); - } - - @Override - public TubemqTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { - if (!Arrays.equals(getFieldNames(), fieldNames) - || !Arrays.equals(getFieldTypes(), fieldTypes)) { - throw new ValidationException("Reconfiguration with different fields is not allowed. " + - "Expected: " + Arrays.toString(getFieldNames()) - + " / " + Arrays.toString(getFieldTypes()) + ". " + - "But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes)); - } - - return this; - } -} diff --git a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceFactory.java similarity index 81% rename from tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java rename to tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceFactory.java index fa006ad..760a763 100644 --- a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java +++ b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceFactory.java @@ -46,17 +46,13 @@ import java.util.Optional; import java.util.TreeSet; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.SchemaValidator; import org.apache.flink.table.factories.DeserializationSchemaFactory; -import org.apache.flink.table.factories.SerializationSchemaFactory; -import org.apache.flink.table.factories.StreamTableSinkFactory; import org.apache.flink.table.factories.StreamTableSourceFactory; import org.apache.flink.table.factories.TableFactoryService; -import org.apache.flink.table.sinks.StreamTableSink; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.types.Row; @@ -64,12 +60,11 @@ import org.apache.flink.types.Row; /** * Factory for creating configured instances of {@link TubemqTableSource}. */ -public class TubemqTableSourceSinkFactory implements StreamTableSourceFactory<Row>, - StreamTableSinkFactory<Row> { +public class TubemqTableSourceFactory implements StreamTableSourceFactory<Row> { private static final String SPLIT_COMMA = ","; - private TubemqTableSourceSinkFactory() { + private TubemqTableSourceFactory() { } @Override @@ -171,52 +166,6 @@ public class TubemqTableSourceSinkFactory implements StreamTableSourceFactory<Ro ); } - @Override - public StreamTableSink<Row> createStreamTableSink( - Map<String, String> properties - ) { - final SerializationSchema<Row> serializationSchema = - getSerializationSchema(properties); - - final DescriptorProperties descriptorProperties = - new DescriptorProperties(true); - descriptorProperties.putProperties(properties); - - validateProperties(descriptorProperties); - - final TableSchema tableSchema = - descriptorProperties.getTableSchema(SCHEMA); - final String topic = - descriptorProperties.getString(TubemqValidator.CONNECTOR_TOPIC); - final String masterAddress = - descriptorProperties.getString(TubemqValidator.CONNECTOR_MASTER); - - final Configuration configuration = - getConfiguration(descriptorProperties); - - return new TubemqTableSink( - serializationSchema, - tableSchema, - topic, - masterAddress, - configuration - ); - } - - private SerializationSchema<Row> getSerializationSchema( - Map<String, String> properties - ) { - @SuppressWarnings("unchecked") - final SerializationSchemaFactory<Row> formatFactory = - TableFactoryService.find( - SerializationSchemaFactory.class, - properties, - this.getClass().getClassLoader() - ); - - return formatFactory.createSerializationSchema(properties); - } - private void validateProperties(DescriptorProperties descriptorProperties) { new SchemaValidator(true, false, false).validate(descriptorProperties); new TubemqValidator().validate(descriptorProperties);