This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new dd0be98 PHOENIX-6935 Remove Phoenix Kafka connector
dd0be98 is described below
commit dd0be98983bf3e6bb9ec47f5bebe2dee203d88cd
Author: Aron Meszaros <[email protected]>
AuthorDate: Mon Oct 9 09:52:40 2023 +0200
PHOENIX-6935 Remove Phoenix Kafka connector
---
README.md | 2 +-
phoenix5-connectors-assembly/pom.xml | 24 --
.../src/build/components/phoenix5-jars.xml | 8 -
phoenix5-kafka/pom.xml | 236 -----------------
.../apache/phoenix/kafka/PhoenixConsumerIT.java | 293 ---------------------
phoenix5-kafka/src/it/resources/consumer.props | 32 ---
phoenix5-kafka/src/it/resources/producer.props | 24 --
.../org/apache/phoenix/kafka/KafkaConstants.java | 52 ----
.../phoenix/kafka/consumer/PhoenixConsumer.java | 292 --------------------
.../kafka/consumer/PhoenixConsumerTool.java | 107 --------
pom.xml | 9 -
11 files changed, 1 insertion(+), 1078 deletions(-)
diff --git a/README.md b/README.md
index c7487ff..fec24b6 100644
--- a/README.md
+++ b/README.md
@@ -22,4 +22,4 @@ limitations under the License.
Copyright ©2019 [Apache Software Foundation](http://www.apache.org/). All
Rights Reserved.
## Introduction
-This repo contains the Flume, Kafka, Spark and Hive connectors for Phoenix.
\ No newline at end of file
+This repo contains the Flume, Spark and Hive connectors for Phoenix.
\ No newline at end of file
diff --git a/phoenix5-connectors-assembly/pom.xml
b/phoenix5-connectors-assembly/pom.xml
index c6c4253..917ec5e 100644
--- a/phoenix5-connectors-assembly/pom.xml
+++ b/phoenix5-connectors-assembly/pom.xml
@@ -49,10 +49,6 @@
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix5-hive-shaded</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix5-kafka</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix5-spark</artifactId>
@@ -153,26 +149,6 @@
</arguments>
</configuration>
</execution>
- <execution>
- <id>kafka without version</id>
- <phase>package</phase>
- <goals>
- <goal>exec</goal>
- </goals>
- <configuration>
- <executable>ln</executable>
-
<workingDirectory>${project.basedir}/../phoenix5-kafka/target</workingDirectory>
- <arguments>
- <argument>-fnsv</argument>
- <argument>
- phoenix5-kafka-${project.version}.jar
- </argument>
- <argument>
- phoenix5-kafka.jar
- </argument>
- </arguments>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
diff --git
a/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
b/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
index b507a93..589c5d8 100644
--- a/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
+++ b/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
@@ -31,14 +31,6 @@
<include>phoenix5-flume.jar</include>
</includes>
</fileSet>
- <fileSet>
- <directory>${project.basedir}/../phoenix5-kafka/target</directory>
- <outputDirectory>/</outputDirectory>
- <includes>
- <include>phoenix5-kafka-${project.version}.jar</include>
- <include>phoenix5-kafka.jar</include>
- </includes>
- </fileSet>
<fileSet>
<directory>${project.basedir}/../phoenix5-spark-shaded/target</directory>
<outputDirectory>/</outputDirectory>
diff --git a/phoenix5-kafka/pom.xml b/phoenix5-kafka/pom.xml
deleted file mode 100644
index ecf9c7e..0000000
--- a/phoenix5-kafka/pom.xml
+++ /dev/null
@@ -1,236 +0,0 @@
-<?xml version='1.0'?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-connectors</artifactId>
- <version>6.0.0-SNAPSHOT</version>
- </parent>
- <artifactId>phoenix5-kafka</artifactId>
- <name>Phoenix Kafka Connector for Phoenix 5</name>
-
- <properties>
- <top.dir>${project.basedir}/..</top.dir>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
-
- <!-- To work with kafka with phoenix -->
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-hbase-compat-${hbase.compat.version}</artifactId>
- <scope>runtime</scope>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${kafka.version}</version>
- </dependency>
- <dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- <version>${commons-cli.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix5-flume</artifactId>
- </dependency>
- <!-- Test dependencies -->
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-core</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.phoenix.thirdparty</groupId>
- <artifactId>phoenix-shaded-guava</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>${kafka.version}</version>
- <classifier>test</classifier>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-it</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- <scope>test</scope>
- </dependency>
-
- <!-- Flume dependencies, as the Kafka connectors extends Flume. -->
-<!-- <dependency> -->
-<!-- <groupId>org.apache.flume</groupId> -->
-<!-- <artifactId>flume-ng-core</artifactId> -->
-<!-- </dependency> -->
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-configuration</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <configuration>
- <ignoreNonCompile>true</ignoreNonCompile>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>3.0.0</version>
- <executions>
- <execution>
- <id>add-parent-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>${project.parent.basedir}/src/main/java</source>
- </sources>
- </configuration>
- </execution>
- <execution>
- <id>add-parent-test-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>${project.parent.basedir}/src/it/java</source>
- <source>${project.parent.basedir}/src/it/resources</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-site-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-resources-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-resources</id>
- <phase>generate-resources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <outputDirectory>${project.build.directory}/test-classes
- </outputDirectory>
- <overwrite>true</overwrite>
- <resources>
- <resource>
-
<directory>${project.parent.basedir}/src/it/resources</directory>
- </resource>
- </resources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <configuration>
- <shadedArtifactAttached>true</shadedArtifactAttached>
- <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
- <shadedClassifierName>shaded-minimal</shadedClassifierName>
- <shadeTestJar>false</shadeTestJar>
- <artifactSet>
- <includes>
- <include>org.apache.phoenix:phoenix5-kafka</include>
- <include>org.apache.kafka:kafka-clients</include>
- <include>org.apache.phoenix:phoenix5-flume</include>
- </includes>
- </artifactSet>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </pluginManagement>
- <plugins>
- <plugin>
- <!-- Allows us to get the apache-ds bundle artifacts -->
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-bundle-plugin</artifactId>
- <extensions>true</extensions>
- <inherited>true</inherited>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git
a/phoenix5-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java
b/phoenix5-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java
deleted file mode 100644
index cb41008..0000000
--- a/phoenix5-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.kafka;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Map;
-import java.util.Properties;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.flume.Context;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.flume.DefaultKeyGenerator;
-import org.apache.phoenix.flume.FlumeConstants;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.apache.phoenix.kafka.consumer.PhoenixConsumer;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-
-import kafka.admin.AdminUtils;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
-import kafka.utils.TestUtils;
-import kafka.utils.Time;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class PhoenixConsumerIT extends BaseTest {
- private static final String ZKHOST = "127.0.0.1";
- private static final String BROKERHOST = "127.0.0.1";
- private static final String BROKERPORT = "9092";
- private static final String TOPIC = "topic1";
- private KafkaServer kafkaServer;
- private PhoenixConsumer pConsumer;
- private EmbeddedZookeeper zkServer;
- private ZkClient zkClient;
- private Connection conn;
-
- @BeforeClass
- public static void doSetup() throws Exception {
- Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
- props.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
- // Must update config before starting server
- setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
- }
-
- @Before
- public void setUp() throws Exception {
- setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
- // setup Zookeeper
- zkServer = new EmbeddedZookeeper();
- String zkConnect = ZKHOST + ":" + zkServer.port();
- zkClient = new ZkClient(zkConnect, 30000, 30000,
ZKStringSerializer$.MODULE$);
- ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
-
- // setup Broker
- Properties brokerProps = new Properties();
- brokerProps.setProperty("zookeeper.connect", zkConnect);
- brokerProps.setProperty("broker.id", "0");
- brokerProps.setProperty("log.dirs",
- Files.createTempDirectory("kafka-").toAbsolutePath().toString());
- brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":"
+ BROKERPORT);
- KafkaConfig config = new KafkaConfig(brokerProps);
- Time mock = new MockTime();
- kafkaServer = TestUtils.createServer(config, mock);
- kafkaServer.startup();
-
- // create topic
- AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties());
-
- pConsumer = new PhoenixConsumer();
-
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- conn = DriverManager.getConnection(getUrl(), props);
- }
-
- @Test
- public void testPhoenixConsumerWithFile() throws SQLException {
- String consumerPath = "consumer.props";
- PhoenixConsumerThread pConsumerThread = new
PhoenixConsumerThread(pConsumer, consumerPath);
- pConsumerThread.properties.setProperty(FlumeConstants.CONFIG_JDBC_URL,
getUrl());
- Thread phoenixConsumer = new Thread(pConsumerThread);
-
- String producerPath = "producer.props";
- KafkaProducerThread kProducerThread = new
KafkaProducerThread(producerPath, TOPIC);
- Thread kafkaProducer = new Thread(kProducerThread);
-
- phoenixConsumer.start();
-
- try {
- phoenixConsumer.join(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- kafkaProducer.start();
-
- try {
- kafkaProducer.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- if (!kafkaProducer.isAlive()) {
- System.out.println("kafka producer is not alive");
- pConsumer.stop();
- }
-
- // Verify our serializer wrote out data
- ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM
SAMPLE1");
- assertTrue(rs.next());
- assertTrue(rs.getFetchSize() > 0);
- rs.close();
- }
-
- @Test
- public void testPhoenixConsumerWithProperties() throws SQLException {
-
- final String fullTableName = "SAMPLE2";
- final String ddl = "CREATE TABLE IF NOT EXISTS SAMPLE2(uid VARCHAR NOT
NULL,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))\n";
-
- Properties consumerProperties = new Properties();
- consumerProperties.setProperty(FlumeConstants.CONFIG_TABLE,
fullTableName);
- consumerProperties.setProperty(FlumeConstants.CONFIG_JDBC_URL,
getUrl());
-
consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
- consumerProperties.setProperty(FlumeConstants.CONFIG_TABLE_DDL, ddl);
- consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX
+ FlumeConstants.CONFIG_REGULAR_EXPRESSION,"([^\\,]*),([^\\,]*),([^\\,]*)");
- consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX
+ FlumeConstants.CONFIG_COLUMN_NAMES,"c1,c2,c3");
- consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX
+ FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, DefaultKeyGenerator.UUID.name());
- consumerProperties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS,
"localhost:9092");
- consumerProperties.setProperty(KafkaConstants.TOPICS, "topic1,topic2");
- consumerProperties.setProperty(KafkaConstants.TIMEOUT, "100");
-
- PhoenixConsumerThread pConsumerThread = new
PhoenixConsumerThread(pConsumer, consumerProperties);
- Thread phoenixConsumer = new Thread(pConsumerThread);
-
- Properties producerProperties = new Properties();
- producerProperties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS,
"localhost:9092");
- producerProperties.setProperty(KafkaConstants.KEY_SERIALIZER,
KafkaConstants.DEFAULT_KEY_SERIALIZER);
- producerProperties.setProperty(KafkaConstants.VALUE_SERIALIZER,
KafkaConstants.DEFAULT_VALUE_SERIALIZER);
- producerProperties.setProperty("auto.commit.interval.ms", "1000");
-
- KafkaProducerThread kProducerThread = new
KafkaProducerThread(producerProperties, TOPIC);
- Thread kafkaProducer = new Thread(kProducerThread);
-
- phoenixConsumer.start();
-
- try {
- phoenixConsumer.join(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- kafkaProducer.start();
-
- try {
- kafkaProducer.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- if (!kafkaProducer.isAlive()) {
- System.out.println("kafka producer is not alive");
- pConsumer.stop();
- }
-
- // Verify our serializer wrote out data
- ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM
SAMPLE2");
- assertTrue(rs.next());
- assertTrue(rs.getFetchSize() > 0);
- rs.close();
- }
-
- @After
- public void cleanUp() throws Exception {
- kafkaServer.shutdown();
- zkClient.close();
- zkServer.shutdown();
- conn.close();
- }
-
- class PhoenixConsumerThread implements Runnable {
- PhoenixConsumer pConsumer;
- Properties properties;
-
- PhoenixConsumerThread(PhoenixConsumer pConsumer, String path) {
- this.pConsumer = pConsumer;
- try (InputStream props =
getClass().getClassLoader().getResourceAsStream(path)) {
- Properties properties = new Properties();
- properties.load(props);
- this.properties = properties;
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- PhoenixConsumerThread(PhoenixConsumer pConsumer, Properties
properties) {
- this.pConsumer = pConsumer;
- this.properties = properties;
- }
-
- @Override
- public void run() {
- // intialize the kafka
- pConsumer.intializeKafka(properties);
-
- // configure the phoenix
- Context context = pConsumer.prepareContext();
- pConsumer.configure(context);
-
- // start the kafka consumer
- pConsumer.start();
-
- // process kafka messages
- pConsumer.process();
- }
- }
-
- class KafkaProducerThread implements Runnable {
- KafkaProducer<String, String> producer;
- String topic;
-
- KafkaProducerThread(String path, String topic) {
- this.topic = topic;
- try (InputStream props =
getClass().getClassLoader().getResourceAsStream(path)) {
- Properties properties = new Properties();
- properties.load(props);
- producer = new KafkaProducer<>(properties);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- KafkaProducerThread(Properties properties, String topic) {
- this.topic = topic;
- producer = new KafkaProducer<>(properties);
- }
-
- @Override
- public void run() {
- try {
- for (int i = 1; i <= 10; i++) {
- String message = String.format("%s,%.3f,%d", "msg" + i, i
* 2000f, i);
- producer.send(new ProducerRecord<String, String>(topic,
message));
- producer.flush();
- Thread.sleep(100);
- }
- } catch (Throwable throwable) {
- System.out.printf("%s", throwable.fillInStackTrace());
- } finally {
- producer.close();
- }
- }
- }
-}
diff --git a/phoenix5-kafka/src/it/resources/consumer.props
b/phoenix5-kafka/src/it/resources/consumer.props
deleted file mode 100644
index 703fd7c..0000000
--- a/phoenix5-kafka/src/it/resources/consumer.props
+++ /dev/null
@@ -1,32 +0,0 @@
-############################################################################
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-############################################################################
-
-serializer=regex
-serializer.rowkeyType=uuid
-serializer.regex=([^\,]*),([^\,]*),([^\,]*)
-serializer.columns=c1,c2,c3
-
-jdbcUrl=jdbc:phoenix:localhost
-table=SAMPLE1
-ddl=CREATE TABLE IF NOT EXISTS SAMPLE1(uid VARCHAR NOT NULL,c1 VARCHAR,c2
VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))
-
-bootstrap.servers=localhost:9092
-topics=topic1,topic2
-poll.timeout.ms=100
diff --git a/phoenix5-kafka/src/it/resources/producer.props
b/phoenix5-kafka/src/it/resources/producer.props
deleted file mode 100644
index 4c3cd2f..0000000
--- a/phoenix5-kafka/src/it/resources/producer.props
+++ /dev/null
@@ -1,24 +0,0 @@
-############################################################################
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-############################################################################
-
-bootstrap.servers=localhost:9092
-auto.commit.interval.ms=1000
-key.serializer=org.apache.kafka.common.serialization.StringSerializer
-value.serializer=org.apache.kafka.common.serialization.StringSerializer
diff --git
a/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java
b/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java
deleted file mode 100644
index cc1aa61..0000000
--- a/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/KafkaConstants.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-public final class KafkaConstants {
-
- public static final String BOOTSTRAP_SERVERS =
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
-
- public static final String KEY_SERIALIZER =
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
-
- public static final String VALUE_SERIALIZER =
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-
- public static final String DEFAULT_KEY_SERIALIZER =
StringSerializer.class.getName();
-
- public static final String DEFAULT_VALUE_SERIALIZER =
StringSerializer.class.getName();
-
- public static final String KEY_DESERIALIZER =
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-
- public static final String VALUE_DESERIALIZER =
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-
- public static final String DEFAULT_KEY_DESERIALIZER =
StringDeserializer.class.getName();
-
- public static final String DEFAULT_VALUE_DESERIALIZER =
StringDeserializer.class.getName();
-
- public static final String TOPICS = "topics";
-
- public static final String GROUP_ID = ConsumerConfig.GROUP_ID_CONFIG;
-
- public static final String TIMEOUT = "poll.timeout.ms";
-
- public static final long DEFAULT_TIMEOUT = 100;
-}
diff --git
a/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java
b/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java
deleted file mode 100644
index 6551386..0000000
---
a/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumer.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.kafka.consumer;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Random;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.event.EventBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.phoenix.flume.FlumeConstants;
-import org.apache.phoenix.flume.serializer.EventSerializer;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.apache.phoenix.kafka.KafkaConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class PhoenixConsumer {
- private static final Logger logger =
LoggerFactory.getLogger(PhoenixConsumer.class);
-
- private KafkaConsumer<String, String> consumer = null;
- private Properties properties = new Properties();
- private Integer batchSize;
- private long timeout;
- private EventSerializer serializer;
- private Boolean process = true;
-
- public PhoenixConsumer() {
-
- }
-
- public PhoenixConsumer(Configuration conf) throws IOException {
- // intialize the kafka
- intializeKafka(conf);
-
- // configure the phoenix
- Context context = prepareContext();
- configure(context);
-
- // start the kafka consumer
- start();
-
- // process kafka messages
- process();
- }
-
- /**
- * Initializes the kafka with properties file.
- * @param conf
- * @throws IOException
- */
- public void intializeKafka(Configuration conf) throws IOException {
- // get the kafka consumer file
- String file = conf.get("kafka.consumer.file");
- if(file==null){
- throw new NullPointerException("File path cannot be empty, please
specify in the arguments");
- }
-
- Path path = new Path(file);
- FileSystem fs = FileSystem.get(conf);
- try (InputStream props = fs.open(path)) {
- properties.load(props);
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- intializeKafka(properties);
- }
-
- /**
- * Initializes the kafka with properties.
- * @param properties
- */
- public void intializeKafka(Properties properties) {
- this.properties = properties;
-
- String servers =
properties.getProperty(KafkaConstants.BOOTSTRAP_SERVERS);
- if(servers ==null){
- throw new NullPointerException("Bootstrap Servers cannot be empty,
please specify in the configuration file");
- }
- properties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, servers);
-
- if (properties.getProperty(KafkaConstants.GROUP_ID) == null) {
- properties.setProperty(KafkaConstants.GROUP_ID, "group-" + new
Random().nextInt(100000));
- }
-
- if (properties.getProperty(KafkaConstants.TIMEOUT) == null) {
- properties.setProperty(KafkaConstants.TIMEOUT,
String.valueOf(KafkaConstants.DEFAULT_TIMEOUT));
- }
-
- String topics = properties.getProperty(KafkaConstants.TOPICS);
-
- if (topics == null) {
- throw new NullPointerException("Topics cannot be empty, please
specify in the configuration file");
- }
-
- properties.setProperty(KafkaConstants.KEY_DESERIALIZER,
KafkaConstants.DEFAULT_KEY_DESERIALIZER);
-
- properties.setProperty(KafkaConstants.VALUE_DESERIALIZER,
KafkaConstants.DEFAULT_VALUE_DESERIALIZER);
-
- this.consumer = new KafkaConsumer<>(properties);
- consumer.subscribe(Arrays.asList(topics.split(",")));
- }
-
- /**
- * Convert the properties to context
- */
- public Context prepareContext() {
- Map<String, String> map = new HashMap<String, String>();
- for (Entry<Object, Object> entry : properties.entrySet()) {
- map.put((String) entry.getKey(), (String) entry.getValue());
- }
- return new Context(map);
- }
-
- /**
- * Configure the context
- */
- public void configure(Context context){
- this.timeout = context.getLong(KafkaConstants.TIMEOUT,
KafkaConstants.DEFAULT_TIMEOUT);
- this.batchSize = context.getInteger(FlumeConstants.CONFIG_BATCHSIZE,
FlumeConstants.DEFAULT_BATCH_SIZE);
- final String eventSerializerType =
context.getString(FlumeConstants.CONFIG_SERIALIZER);
- if (eventSerializerType ==null){
- throw new NullPointerException("Event serializer cannot be empty,
please specify in the configuration file");
- }
- initializeSerializer(context,eventSerializerType);
- }
-
- /**
- * Process the kafka messages
- */
- public void process() {
- int timeouts = 0;
- // noinspection InfiniteLoopStatement
- while (process) {
- // read records with a short timeout.
- // If we time out, we don't really care.
- // Assuming only key & value text data
- ConsumerRecords<String, String> records =
consumer.poll(this.timeout);
- if (records.count() == 0) {
- timeouts++;
- } else {
- System.out.printf("Got %d records after %d timeouts\n",
records.count(), timeouts);
- timeouts = 0;
- }
-
- if (!records.isEmpty()) {
- List<Event> events = new ArrayList<>(records.count());
- for (ConsumerRecord<String, String> record : records) {
- Event event =
EventBuilder.withBody(Bytes.toBytes(record.value()));
- events.add(event);
- }
- // save to Hbase
- try {
- serializer.upsertEvents(events);
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- /**
- * start the serializer
- */
- public void start() {
- logger.info("Starting consumer {} ", this.getClass());
- try {
- serializer.initialize();
- } catch (Exception ex) {
- logger.error("Error {} in initializing the serializer.",
ex.getMessage());
- if (ex instanceof RuntimeException){
- throw RuntimeException.class.cast(ex);
- }
- else {
- throw new RuntimeException(ex);
- }
- }
- }
-
- /**
- * stop the consumer and serializer
- */
- public void stop() {
- this.close();
- consumer.close();
- try {
- serializer.close();
- } catch (SQLException e) {
- logger.error(" Error while closing connection {} for consumer.",
e.getMessage());
- }
- }
-
- /**
- * make the changes to stop in gracefully
- */
- public void close(){
- this.process = false;
- try {
- Thread.sleep(30000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * Initializes the serializer for kafka messages.
- * @param context
- * @param eventSerializerType
- */
- private void initializeSerializer(final Context context, final String
eventSerializerType) {
- String serializerClazz = null;
- EventSerializers eventSerializer = null;
-
- try {
- eventSerializer =
EventSerializers.valueOf(eventSerializerType.toUpperCase());
- } catch (IllegalArgumentException iae) {
- serializerClazz = eventSerializerType;
- }
-
- final Context serializerContext = new Context();
-
serializerContext.putAll(context.getSubProperties(FlumeConstants.CONFIG_SERIALIZER_PREFIX));
- copyPropertiesToSerializerContext(context,serializerContext);
-
- try {
- @SuppressWarnings("unchecked")
- Class<? extends EventSerializer> clazz = null;
- if (serializerClazz == null) {
- clazz = (Class<? extends EventSerializer>)
Class.forName(eventSerializer.getClassName());
- } else {
- clazz = (Class<? extends EventSerializer>)
Class.forName(serializerClazz);
- }
-
- serializer = clazz.newInstance();
- serializer.configure(serializerContext);
- } catch (Exception e) {
- logger.error("Could not instantiate event serializer.", e);
- if (e instanceof RuntimeException){
- throw (RuntimeException)e;
- }
- else {
- throw new RuntimeException(e);
- }
- }
- }
-
- /**
- * Copy properties to serializer context.
- * @param context
- * @param serializerContext
- */
- private void copyPropertiesToSerializerContext(Context context, Context
serializerContext) {
-
serializerContext.put(FlumeConstants.CONFIG_TABLE_DDL,context.getString(FlumeConstants.CONFIG_TABLE_DDL));
-
serializerContext.put(FlumeConstants.CONFIG_TABLE,context.getString(FlumeConstants.CONFIG_TABLE));
-
serializerContext.put(FlumeConstants.CONFIG_ZK_QUORUM,context.getString(FlumeConstants.CONFIG_ZK_QUORUM));
-
serializerContext.put(FlumeConstants.CONFIG_JDBC_URL,context.getString(FlumeConstants.CONFIG_JDBC_URL));
-
serializerContext.put(FlumeConstants.CONFIG_BATCHSIZE,context.getString(FlumeConstants.CONFIG_BATCHSIZE));
- }
-
-}
diff --git
a/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java
b/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java
deleted file mode 100644
index 8c10aa5..0000000
---
a/phoenix5-kafka/src/main/java/org/apache/phoenix/kafka/consumer/PhoenixConsumerTool.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.kafka.consumer;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PhoenixConsumerTool extends Configured implements Tool {
- private static final Logger logger =
LoggerFactory.getLogger(PhoenixConsumerTool.class);
- static final Option FILE_PATH_OPT = new Option("f", "file", true, "input
file path");
- static final Option HELP_OPT = new Option("h", "help", false, "Show this
help and quit");
-
- public static Options getOptions() {
- Options options = new Options();
- options.addOption(FILE_PATH_OPT);
- options.addOption(HELP_OPT);
- return options;
- }
-
- public static CommandLine parseOptions(String[] args) {
-
- Options options = getOptions();
-
- CommandLineParser parser = new PosixParser();
- CommandLine cmdLine = null;
- try {
- cmdLine = parser.parse(options, args);
- } catch (ParseException e) {
- printHelpAndExit("Error parsing command line options: " +
e.getMessage(), options);
- }
-
- if (cmdLine.hasOption(HELP_OPT.getOpt())) {
- printHelpAndExit(options, 0);
- }
-
- if (!cmdLine.hasOption(FILE_PATH_OPT.getOpt())) {
- throw new IllegalStateException(FILE_PATH_OPT.getLongOpt() + " is
a mandatory " + "parameter");
- }
-
- if (!cmdLine.getArgList().isEmpty()) {
- throw new IllegalStateException("Got unexpected extra parameters:
" + cmdLine.getArgList());
- }
-
- return cmdLine;
- }
-
- public static void printHelpAndExit(String errorMessage, Options options) {
- System.err.println(errorMessage);
- printHelpAndExit(options, 1);
- }
-
- public static void printHelpAndExit(Options options, int exitCode) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("help", options);
- System.exit(exitCode);
- }
-
- @Override
- public int run(String[] args) throws Exception {
- Configuration conf = HBaseConfiguration.create(getConf());
-
- CommandLine cmdLine = null;
- try {
- cmdLine = parseOptions(args);
- } catch (IllegalStateException e) {
- printHelpAndExit(e.getMessage(), getOptions());
- }
-
- String path = cmdLine.getOptionValue(FILE_PATH_OPT.getOpt());
- conf.set("kafka.consumer.file", path);
- new PhoenixConsumer(conf);
-
- return 1;
- }
-
- public static void main(String[] args) throws Exception {
- int exitStatus = ToolRunner.run(new PhoenixConsumerTool(), args);
- System.exit(exitStatus);
- }
-}
diff --git a/pom.xml b/pom.xml
index b4c4cb8..b1a590a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,7 +46,6 @@
<module>phoenix5-hive</module>
<module>phoenix5-hive-shaded</module>
<module>phoenix5-flume</module>
- <module>phoenix5-kafka</module>
<module>phoenix5-spark</module>
<module>phoenix5-spark-shaded</module>
<module>phoenix5-spark3</module>
@@ -100,7 +99,6 @@
<hive3-storage.version>2.7.0</hive3-storage.version>
<hive-storage.version>${hive3-storage.version}</hive-storage.version>
<flume.version>1.4.0</flume.version>
- <kafka.version>0.9.0.0</kafka.version>
<spark.version>2.4.0</spark.version>
<spark3.version>3.0.3</spark3.version>
<scala.version>2.11.12</scala.version>
@@ -118,8 +116,6 @@
<jodatime.version>2.10.5</jodatime.version>
<commons-cli.version>1.4</commons-cli.version>
<commons-compress.version>1.9</commons-compress.version>
- <!-- For Kafka -->
- <com-101tek-zkclient.version>0.7</com-101tek-zkclient.version>
<!-- For hive -->
<commons-io.version>2.11.0</commons-io.version>
@@ -515,11 +511,6 @@
<artifactId>phoenix5-flume</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix5-kafka</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix5-spark</artifactId>