This is an automated email from the ASF dual-hosted git repository.

lamberliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new ecb766e  Revert "[TUBEMQ-197] Support TubeMQ connector for Apache 
Flink (#157)"
ecb766e is described below

commit ecb766e4fc6fa94338f8b930060ec4cd03e5dfc7
Author: lamberliu <55134155+lamber...@users.noreply.github.com>
AuthorDate: Thu Jun 18 10:47:54 2020 +0800

    Revert "[TUBEMQ-197] Support TubeMQ connector for Apache Flink (#157)"
    
    This reverts commit a770cdab0c4a7faf4c831c5f6397bcf923c82725.
---
 .../flink/connectors/tubemq/TubemqOptions.java     |  11 --
 .../connectors/tubemq/TubemqSinkFunction.java      | 181 ---------------------
 .../flink/connectors/tubemq/TubemqTableSink.java   | 139 ----------------
 ...kFactory.java => TubemqTableSourceFactory.java} |  55 +------
 4 files changed, 2 insertions(+), 384 deletions(-)

diff --git 
a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
 
b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
index d37dbe0..8be20d3 100644
--- 
a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
+++ 
b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
@@ -31,17 +31,6 @@ public class TubemqOptions {
             .noDefaultValue()
             .withDescription("The session key for this consumer group at 
startup.");
 
-    public static final ConfigOption<String> TID =
-        ConfigOptions.key("topic.tid")
-            .noDefaultValue()
-            .withDescription("The tid owned this topic.");
-
-    public static final ConfigOption<Integer> MAX_RETRIES =
-        ConfigOptions.key("max.retries")
-            .defaultValue(5)
-            .withDescription("The maximum number of retries when an " +
-                "exception is caught.");
-
     public static final ConfigOption<Boolean> BOOTSTRAP_FROM_MAX =
         ConfigOptions.key("bootstrap.from.max")
             .defaultValue(false)
diff --git 
a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
 
b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
deleted file mode 100644
index 425e16b..0000000
--- 
a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.connectors.tubemq;
-
-import static org.apache.flink.connectors.tubemq.TubemqOptions.MAX_RETRIES;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashSet;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.tubemq.client.config.TubeClientConfig;
-import org.apache.tubemq.client.factory.MessageSessionFactory;
-import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
-import org.apache.tubemq.client.producer.MessageProducer;
-import org.apache.tubemq.client.producer.MessageSentResult;
-import org.apache.tubemq.corebase.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TubemqSinkFunction<T> extends RichSinkFunction<T> implements 
CheckpointedFunction {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(TubemqSinkFunction.class);
-
-    private static final String SYSTEM_HEADER_TIME_FORMAT = "yyyyMMddHHmm";
-
-    /**
-     * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081.
-     */
-    private final String masterAddress;
-
-    /**
-     * The topic name.
-     */
-    private final String topic;
-
-    /**
-     * The tid of this topic
-     */
-    private final String tid;
-    /**
-     * The serializer for the records sent to pulsar.
-     */
-    private final SerializationSchema<T> serializationSchema;
-
-    /**
-     * The tubemq producer.
-     */
-    private transient MessageProducer producer;
-
-    /**
-     * The tubemq session factory.
-     */
-    private transient MessageSessionFactory sessionFactory;
-
-    /**
-     * The maximum number of retries.
-     */
-    private final int maxRetries;
-
-    public TubemqSinkFunction(String topic,
-                              String masterAddress,
-                              SerializationSchema<T> serializationSchema,
-                              Configuration configuration) {
-        Preconditions.checkNotNull(topic,
-            "The topic must not be null.");
-        Preconditions.checkNotNull(masterAddress,
-            "The master address must not be null.");
-        Preconditions.checkNotNull(serializationSchema,
-            "The serialization schema must not be null.");
-        Preconditions.checkNotNull(configuration,
-            "The configuration must not be null.");
-
-        this.topic = topic;
-        this.masterAddress = masterAddress;
-        this.serializationSchema = serializationSchema;
-        this.tid = configuration.getString(TubemqOptions.TID);
-        this.maxRetries = configuration.getInteger(MAX_RETRIES);
-    }
-
-    @Override
-    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
{
-        // Nothing to do.
-    }
-
-    @Override
-    public void initializeState(FunctionInitializationContext 
functionInitializationContext) {
-        // Nothing to do.
-    }
-
-    @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
-
-        TubeClientConfig tubeClientConfig = new 
TubeClientConfig(masterAddress);
-        this.sessionFactory = new TubeSingleSessionFactory(tubeClientConfig);
-        this.producer = sessionFactory.createProducer();
-        HashSet<String> hashSet = new HashSet<>();
-        hashSet.add(topic);
-        producer.publish(hashSet);
-    }
-
-    @Override
-    public void invoke(T in, Context context) throws Exception {
-
-        int retries = 0;
-        Exception exception = null;
-
-        while (maxRetries <= 0 || retries < maxRetries) {
-
-            try {
-                byte[] body = serializationSchema.serialize(in);
-                Message message = new Message(topic, body);
-                if (StringUtils.isNotBlank(tid)) {
-                    SimpleDateFormat sdf = new 
SimpleDateFormat(SYSTEM_HEADER_TIME_FORMAT);
-                    long currTimeMillis = System.currentTimeMillis();
-                    message.putSystemHeader(tid, sdf.format(new 
Date(currTimeMillis)));
-                }
-
-                MessageSentResult sendResult = producer.sendMessage(message);
-                if (sendResult.isSuccess()) {
-                    return;
-                } else {
-                    LOG.warn("Send msg fail, error code: {}, error message: 
{}",
-                        sendResult.getErrCode(), sendResult.getErrMsg());
-                }
-            } catch (Exception e) {
-                LOG.warn("Could not properly send the message to hippo " +
-                    "(retries: {}).", retries, e);
-
-                retries++;
-                exception = ExceptionUtils.firstOrSuppressed(e, exception);
-            }
-        }
-
-        throw new IOException("Could not properly send the message to hippo.", 
exception);
-    }
-
-    @Override
-    public void close() throws Exception {
-
-        try {
-            if (producer != null) {
-                producer.shutdown();
-                producer = null;
-            }
-            if (sessionFactory != null) {
-                sessionFactory.shutdown();
-                sessionFactory = null;
-            }
-        } catch (Throwable e) {
-            LOG.error("Shutdown producer error", e);
-        } finally {
-            super.close();
-        }
-    }
-}
diff --git 
a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java
 
b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java
deleted file mode 100644
index cf40879..0000000
--- 
a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.utils.TableConnectorUtils;
-import org.apache.flink.types.Row;
-
-/**
- * Tubemq {@link org.apache.flink.table.sinks.StreamTableSink}.
- */
-public class TubemqTableSink implements AppendStreamTableSink<Row> {
-
-    /**
-     * Serialization schema for records to tubemq.
-     */
-    private final SerializationSchema<Row> serializationSchema;
-
-    /**
-     * The schema of the table.
-     */
-    private final TableSchema schema;
-
-    /**
-     * The tubemq topic name.
-     */
-    private final String topic;
-
-    /**
-     * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
-     */
-    private final String masterAddress;
-
-    /**
-     * The parameters collection for tubemq producer.
-     */
-    private final Configuration configuration;
-
-    public TubemqTableSink(
-        SerializationSchema<Row> serializationSchema,
-        TableSchema schema,
-        String topic,
-        String masterAddress,
-        Configuration configuration
-    ) {
-        this.serializationSchema = checkNotNull(serializationSchema,
-            "The deserialization schema must not be null.");
-        this.schema = checkNotNull(schema,
-            "The schema must not be null.");
-        this.topic = checkNotNull(topic,
-            "Topic must not be null.");
-        this.masterAddress = checkNotNull(masterAddress,
-            "Master address must not be null.");
-        this.configuration = checkNotNull(configuration,
-            "The configuration must not be null.");
-    }
-
-    @Override
-    public void emitDataStream(DataStream<Row> dataStream) {
-        consumeDataStream(dataStream);
-    }
-
-    @Override
-    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
-
-        final SinkFunction<Row> tubemqSinkFunction =
-            new TubemqSinkFunction<>(
-                topic,
-                masterAddress,
-                serializationSchema,
-                configuration
-            );
-
-        return dataStream
-            .addSink(tubemqSinkFunction)
-            .name(
-                TableConnectorUtils.generateRuntimeName(
-                    getClass(),
-                    getFieldNames()
-                )
-            );
-    }
-
-    @Override
-    public TypeInformation<Row> getOutputType() {
-        return schema.toRowType();
-    }
-
-    @Override
-    public String[] getFieldNames() {
-        return schema.getFieldNames();
-    }
-
-    @Override
-    public TypeInformation<?>[] getFieldTypes() {
-        return schema.getFieldTypes();
-    }
-
-    @Override
-    public TubemqTableSink configure(String[] fieldNames, TypeInformation<?>[] 
fieldTypes) {
-        if (!Arrays.equals(getFieldNames(), fieldNames)
-            || !Arrays.equals(getFieldTypes(), fieldTypes)) {
-            throw new ValidationException("Reconfiguration with different 
fields is not allowed. " +
-                "Expected: " + Arrays.toString(getFieldNames())
-                 + " / " + Arrays.toString(getFieldTypes()) + ". " +
-                 "But was: " + Arrays.toString(fieldNames) + " / " + 
Arrays.toString(fieldTypes));
-        }
-
-        return this;
-    }
-}
diff --git 
a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
 
b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceFactory.java
similarity index 81%
rename from 
tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
rename to 
tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceFactory.java
index fa006ad..760a763 100644
--- 
a/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
+++ 
b/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceFactory.java
@@ -46,17 +46,13 @@ import java.util.Optional;
 import java.util.TreeSet;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.SchemaValidator;
 import org.apache.flink.table.factories.DeserializationSchemaFactory;
-import org.apache.flink.table.factories.SerializationSchemaFactory;
-import org.apache.flink.table.factories.StreamTableSinkFactory;
 import org.apache.flink.table.factories.StreamTableSourceFactory;
 import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.sinks.StreamTableSink;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
@@ -64,12 +60,11 @@ import org.apache.flink.types.Row;
 /**
  * Factory for creating configured instances of {@link TubemqTableSource}.
  */
-public class TubemqTableSourceSinkFactory implements 
StreamTableSourceFactory<Row>,
-    StreamTableSinkFactory<Row> {
+public class TubemqTableSourceFactory implements StreamTableSourceFactory<Row> 
{
 
     private static final String SPLIT_COMMA = ",";
 
-    private TubemqTableSourceSinkFactory() {
+    private TubemqTableSourceFactory() {
     }
 
     @Override
@@ -171,52 +166,6 @@ public class TubemqTableSourceSinkFactory implements 
StreamTableSourceFactory<Ro
         );
     }
 
-    @Override
-    public StreamTableSink<Row> createStreamTableSink(
-        Map<String, String> properties
-    ) {
-        final SerializationSchema<Row> serializationSchema =
-            getSerializationSchema(properties);
-
-        final DescriptorProperties descriptorProperties =
-            new DescriptorProperties(true);
-        descriptorProperties.putProperties(properties);
-
-        validateProperties(descriptorProperties);
-
-        final TableSchema tableSchema =
-            descriptorProperties.getTableSchema(SCHEMA);
-        final String topic =
-            descriptorProperties.getString(TubemqValidator.CONNECTOR_TOPIC);
-        final String masterAddress =
-            descriptorProperties.getString(TubemqValidator.CONNECTOR_MASTER);
-
-        final Configuration configuration =
-            getConfiguration(descriptorProperties);
-
-        return new TubemqTableSink(
-            serializationSchema,
-            tableSchema,
-            topic,
-            masterAddress,
-            configuration
-        );
-    }
-
-    private SerializationSchema<Row> getSerializationSchema(
-        Map<String, String> properties
-    ) {
-        @SuppressWarnings("unchecked")
-        final SerializationSchemaFactory<Row> formatFactory =
-            TableFactoryService.find(
-                SerializationSchemaFactory.class,
-                properties,
-                this.getClass().getClassLoader()
-            );
-
-        return formatFactory.createSerializationSchema(properties);
-    }
-
     private void validateProperties(DescriptorProperties descriptorProperties) 
{
         new SchemaValidator(true, false, false).validate(descriptorProperties);
         new TubemqValidator().validate(descriptorProperties);

Reply via email to