This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new 062b34c PHOENIX-6941 Remove Phoenix Flume connector
062b34c is described below
commit 062b34cfa7ce39af22aa09b6cb12f19ce61d17ac
Author: Aron Meszaros <[email protected]>
AuthorDate: Mon Oct 9 17:18:01 2023 +0200
PHOENIX-6941 Remove Phoenix Flume connector
---
README.md | 2 +-
phoenix5-connectors-assembly/pom.xml | 24 -
.../src/build/components/phoenix5-jars.xml | 10 -
phoenix5-flume/pom.xml | 177 -------
.../apache/phoenix/flume/CsvEventSerializerIT.java | 451 ----------------
.../phoenix/flume/JsonEventSerializerIT.java | 575 ---------------------
.../org/apache/phoenix/flume/PhoenixSinkIT.java | 290 -----------
.../phoenix/flume/RegexEventSerializerIT.java | 447 ----------------
.../phoenix/flume/serializer/CustomSerializer.java | 43 --
.../apache/phoenix/flume/sink/NullPhoenixSink.java | 21 -
.../apache/phoenix/flume/DefaultKeyGenerator.java | 69 ---
.../org/apache/phoenix/flume/FlumeConstants.java | 94 ----
.../org/apache/phoenix/flume/KeyGenerator.java | 24 -
.../org/apache/phoenix/flume/SchemaHandler.java | 48 --
.../flume/serializer/BaseEventSerializer.java | 252 ---------
.../flume/serializer/CsvEventSerializer.java | 209 --------
.../phoenix/flume/serializer/EventSerializer.java | 42 --
.../phoenix/flume/serializer/EventSerializers.java | 36 --
.../flume/serializer/JsonEventSerializer.java | 232 ---------
.../flume/serializer/RegexEventSerializer.java | 150 ------
.../org/apache/phoenix/flume/sink/PhoenixSink.java | 221 --------
pom.xml | 40 --
22 files changed, 1 insertion(+), 3456 deletions(-)
diff --git a/README.md b/README.md
index fec24b6..cb1c4ba 100644
--- a/README.md
+++ b/README.md
@@ -22,4 +22,4 @@ limitations under the License.
Copyright ©2019 [Apache Software Foundation](http://www.apache.org/). All
Rights Reserved.
## Introduction
-This repo contains the Flume, Spark and Hive connectors for Phoenix.
\ No newline at end of file
+This repo contains the Spark and Hive connectors for Phoenix.
\ No newline at end of file
diff --git a/phoenix5-connectors-assembly/pom.xml
b/phoenix5-connectors-assembly/pom.xml
index 917ec5e..ed8a56d 100644
--- a/phoenix5-connectors-assembly/pom.xml
+++ b/phoenix5-connectors-assembly/pom.xml
@@ -37,10 +37,6 @@
</properties>
<dependencies>
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix5-flume</artifactId>
- </dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix5-hive</artifactId>
@@ -69,26 +65,6 @@
<artifactId>exec-maven-plugin</artifactId>
<groupId>org.codehaus.mojo</groupId>
<executions>
- <execution>
- <id>flume without version</id>
- <phase>package</phase>
- <goals>
- <goal>exec</goal>
- </goals>
- <configuration>
- <executable>ln</executable>
-
<workingDirectory>${project.basedir}/../phoenix5-flume/target</workingDirectory>
- <arguments>
- <argument>-fnsv</argument>
- <argument>
- phoenix5-flume-${project.version}.jar
- </argument>
- <argument>
- phoenix5-flume.jar
- </argument>
- </arguments>
- </configuration>
- </execution>
<execution>
<id>hive without version</id>
<phase>package</phase>
diff --git
a/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
b/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
index 589c5d8..ac02e2f 100644
--- a/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
+++ b/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
@@ -21,16 +21,6 @@
-->
<component>
<fileSets>
- <fileSet>
- <!-- We don't actually have shaded flume JARs ATM,
- but there's no point in packaging the unshaded ones. -->
- <directory>${project.basedir}/../phoenix5-flume/target</directory>
- <outputDirectory>/</outputDirectory>
- <includes>
- <include>phoenix5-flume-${project.version}.jar</include>
- <include>phoenix5-flume.jar</include>
- </includes>
- </fileSet>
<fileSet>
<directory>${project.basedir}/../phoenix5-spark-shaded/target</directory>
<outputDirectory>/</outputDirectory>
diff --git a/phoenix5-flume/pom.xml b/phoenix5-flume/pom.xml
deleted file mode 100644
index 8623794..0000000
--- a/phoenix5-flume/pom.xml
+++ /dev/null
@@ -1,177 +0,0 @@
-<?xml version='1.0'?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-connectors</artifactId>
- <version>6.0.0-SNAPSHOT</version>
- </parent>
- <artifactId>phoenix5-flume</artifactId>
- <name>Phoenix Flume Connector for Phoenix 5</name>
-
- <properties>
- <top.dir>${project.basedir}/..</top.dir>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-hbase-compat-${hbase.compat.version}</artifactId>
- <scope>runtime</scope>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- <version>${commons-collections.version}</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.tdunning</groupId>
- <artifactId>json</artifactId>
- <version>1.8</version>
- </dependency>
- <dependency>
- <groupId>com.jayway.jsonpath</groupId>
- <artifactId>json-path</artifactId>
- <version>2.2.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-csv</artifactId>
- <version>${commons-csv.version}</version>
- </dependency>
-
- <!-- Test Dependencies -->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-it</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minicluster</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>test</scope>
- </dependency>
-
- <!-- Main dependency on flume. The last to avoid using old commons-io
- in IT -->
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-configuration</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <configuration>
- <!-- AFAICT these are bogus dependency problems -->
- <ignoredUnusedDeclaredDependencies>
- <ignoredUnusedDeclaredDependency>
- org.apache.hbase:hbase-it
- </ignoredUnusedDeclaredDependency>
- <ignoredUnusedDeclaredDependency>
- org.apache.hadoop:hadoop-minicluster
- </ignoredUnusedDeclaredDependency>
- <ignoredUnusedDeclaredDependency>
- org.apache.hadoop:hadoop-common
- </ignoredUnusedDeclaredDependency>
- <ignoredUnusedDeclaredDependency>
- org.apache.phoenix:phoenix-hbase-compat-${hbase.compat.version}
- </ignoredUnusedDeclaredDependency>
- </ignoredUnusedDeclaredDependencies>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>3.0.0</version>
- <executions>
- <execution>
- <id>add-parent-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>${project.parent.basedir}/src/main/java</source>
- </sources>
- </configuration>
- </execution>
- <execution>
- <id>add-parent-test-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>${project.parent.basedir}/src/it/java</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-resources-plugin</artifactId>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
-</project>
diff --git
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
b/phoenix5-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
deleted file mode 100644
index 1db9d2f..0000000
---
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
+++ /dev/null
@@ -1,451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.apache.phoenix.flume.sink.PhoenixSink;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.query.BaseTest;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class CsvEventSerializerIT extends BaseTest {
-
- private Context sinkContext;
- private PhoenixSink sink;
-
- @BeforeClass
- public static synchronized void doSetup() throws Exception {
- setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
- }
-
- @AfterClass
- public static synchronized void doTeardown() throws Exception {
- dropNonSystemTables();
- }
-
- @After
- public void cleanUpAfterTest() throws Exception {
- deletePriorMetaData(HConstants.LATEST_TIMESTAMP, getUrl());
- }
-
- @Test
- public void testWithDefaultDelimiters() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = "FLUME_CSV_TEST";
-
- String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
- + " (flume_time timestamp not null, col1
varchar , col2 double, col3 varchar[], col4 integer[]"
- + " CONSTRAINT pk PRIMARY KEY (flume_time))\n";
- String columns = "col1,col2,col3,col4";
- String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
- initSinkContext(fullTableName, ddl, columns, null, null, null,
null, rowkeyType, null);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
-
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
-
- final String eventBody =
"kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
- final Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(1, rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- dropTable(fullTableName);
- }
-
- @Test
- public void testKeyGenerator() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = "FLUME_CSV_TEST";
- initSinkContextWithDefaults(fullTableName);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
-
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- final String eventBody =
"kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
- final Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(1, rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- dropTable(fullTableName);
- }
-
- @Test
- public void testMismatchKeyGenerator() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = "FLUME_CSV_TEST";
- initSinkContextWithDefaults(fullTableName);
- setConfig(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
- DefaultKeyGenerator.UUID.name());
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- final String eventBody =
"kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
- final Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- try {
- sink.process();
- fail();
- } catch (Exception ex) {
-
assertTrue(ex.getCause().getMessage().contains("java.lang.IllegalArgumentException:
Invalid format:"));
- }
-
- dropTable(fullTableName);
- }
-
- @Test
- public void testMissingColumnsInEvent() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = "FLUME_CSV_TEST";
- initSinkContextWithDefaults(fullTableName);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- final String eventBody = "kalyan,\"abc,pqr,xyz\",\"1,2,3,4\"";
- final Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(0, rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- dropTable(fullTableName);
- }
-
- @Test
- public void testBatchEvents() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = "FLUME_CSV_TEST";
- initSinkContextWithDefaults(fullTableName);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- int numEvents = 150;
- String col1 = "val1";
- String a1 = "\"aaa,bbb,ccc\"";
- String a2 = "\"1,2,3,4\"";
- String eventBody = null;
- List<Event> eventList = new ArrayList<>(numEvents);
- for (int i = 0; i < eventList.size(); i++) {
- eventBody = (col1 + i) + "," + i * 10.5 + "," + a1 +
"," + a2;
- Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody));
- eventList.add(event);
- }
-
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- for (Event event : eventList) {
- channel.put(event);
- }
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(eventList.size(), rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- dropTable(fullTableName);
- }
-
- @Test
- public void testEventsWithHeaders() throws Exception {
-
- sinkContext = new Context();
- final String fullTableName = "FLUME_CSV_TEST";
- final String ddl = "CREATE TABLE IF NOT EXISTS "
- + fullTableName
- + " (rowkey VARCHAR not null, col1 varchar ,
col2 double, col3 varchar[], col4 integer[], host varchar , source varchar \n"
- + " CONSTRAINT pk PRIMARY KEY (rowkey))\n";
- String columns = "col1,col2,col3,col4";
- String rowkeyType = DefaultKeyGenerator.UUID.name();
- String headers = "host,source";
- initSinkContext(fullTableName, ddl, columns, null, null, null,
null, rowkeyType, headers);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
-
- int numEvents = 10;
- String col1 = "val1";
- String a1 = "\"aaa,bbb,ccc\"";
- String a2 = "\"1,2,3,4\"";
- String hostHeader = "host1";
- String sourceHeader = "source1";
- String eventBody = null;
- List<Event> eventList = new ArrayList<>(numEvents);
- for (int i = 0; i < numEvents; i++) {
- eventBody = (col1 + i) + "," + i * 10.5 + "," + a1 +
"," + a2;
- Map<String, String> headerMap = new HashMap<>(2);
- headerMap.put("host", hostHeader);
- headerMap.put("source", sourceHeader);
- Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody), headerMap);
- eventList.add(event);
- }
-
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- for (Event event : eventList) {
- channel.put(event);
- }
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- final String query = " SELECT * FROM \n " + fullTableName;
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- final ResultSet rs;
- final Connection conn = DriverManager.getConnection(getUrl(),
props);
- try {
- rs = conn.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertEquals("host1", rs.getString("host"));
- assertEquals("source1", rs.getString("source"));
-
- assertTrue(rs.next());
- assertEquals("host1", rs.getString("host"));
- assertEquals("source1", rs.getString("source"));
- } finally {
- if (conn != null) {
- conn.close();
- }
- }
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- dropTable(fullTableName);
- }
-
- private Channel initChannel() {
- // Channel configuration
- Context channelContext = new Context();
- channelContext.put("capacity", "10000");
- channelContext.put("transactionCapacity", "200");
-
- Channel channel = new MemoryChannel();
- channel.setName("memorychannel");
- Configurables.configure(channel, channelContext);
- return channel;
- }
-
- private void initSinkContext(final String fullTableName, final String
ddl, final String columns,
- final String csvDelimiter, final String csvQuote, final
String csvEscape, final String csvArrayDelimiter,
- final String rowkeyType, final String headers) {
- if (fullTableName == null){
- throw new NullPointerException();
- }
- sinkContext = new Context();
- sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
- sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,
EventSerializers.CSV.name());
- sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_COLUMN_NAMES, columns);
- if (null != csvDelimiter)
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX
+ FlumeConstants.CSV_DELIMITER, csvDelimiter);
- if (null != csvQuote)
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX
+ FlumeConstants.CSV_QUOTE, csvQuote);
- if (null != csvEscape)
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX
+ FlumeConstants.CSV_ESCAPE, csvEscape);
- if (null != csvArrayDelimiter)
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX
+ FlumeConstants.CSV_ARRAY_DELIMITER,
- csvArrayDelimiter);
- if (null != rowkeyType)
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX
+ FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
- rowkeyType);
- if (null != headers)
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX
+ FlumeConstants.CONFIG_HEADER_NAMES, headers);
- }
-
- private void initSinkContextWithDefaults(final String fullTableName) {
- String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
- + " (flume_time timestamp not null, col1
varchar , col2 double, col3 varchar[], col4 integer[]"
- + " CONSTRAINT pk PRIMARY KEY (flume_time))\n";
- String columns = "col1,col2,col3,col4";
- String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
- initSinkContext(fullTableName, ddl, columns, null, null, null,
null, rowkeyType, null);
- }
-
- private void setConfig(final String configName, final String
configValue) {
- if (sinkContext == null){
- throw new NullPointerException();
- }
- if (configName == null){
- throw new NullPointerException();
- }
- if (configValue == null){
- throw new NullPointerException();
- }
- sinkContext.put(configName, configValue);
- }
-
- private int countRows(final String fullTableName) throws SQLException {
- if (fullTableName == null){
- throw new NullPointerException();
- }
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- final Connection conn = DriverManager.getConnection(getUrl(),
props);
- ResultSet rs = null;
- try {
- rs = conn.createStatement().executeQuery("select
count(*) from " + fullTableName);
- int rowsCount = 0;
- while (rs.next()) {
- rowsCount = rs.getInt(1);
- }
- return rowsCount;
-
- } finally {
- if (rs != null) {
- rs.close();
- }
- if (conn != null) {
- conn.close();
- }
- }
-
- }
-
- private void dropTable(final String fullTableName) throws SQLException {
- if (fullTableName == null){
- throw new NullPointerException();
- }
-
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- final Connection conn = DriverManager.getConnection(getUrl(),
props);
- try {
- conn.createStatement().execute("drop table if exists "
+ fullTableName);
- } finally {
- if (conn != null) {
- conn.close();
- }
- }
- }
-
-}
diff --git
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java
b/phoenix5-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java
deleted file mode 100644
index d53a560..0000000
---
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.apache.phoenix.flume.sink.PhoenixSink;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class JsonEventSerializerIT extends BaseTest {
-
- private Context sinkContext;
- private PhoenixSink sink;
-
- @BeforeClass
- public static synchronized void doSetup() throws Exception {
- setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
- }
-
- @AfterClass
- public static synchronized void doTeardown() throws Exception {
- dropNonSystemTables();
- }
-
- @After
- public void cleanUpAfterTest() throws Exception {
- deletePriorMetaData(HConstants.LATEST_TIMESTAMP, getUrl());
- }
-
- @Test
- public void testWithOutColumnsMapping() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = "FLUME_JSON_TEST";
-
- String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
- + " (flume_time timestamp not null, col1
varchar , col2 double, col3 varchar[], col4 integer[]"
- + " CONSTRAINT pk PRIMARY KEY (flume_time))\n";
- String columns = "col1,col2,col3,col4";
- String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
- initSinkContext(fullTableName, ddl, columns, null, rowkeyType,
null);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
-
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- final String eventBody = "{\"col1\" : \"kalyan\", \"col2\" :
10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
- final Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(1, rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- dropTable(fullTableName);
- }
-
- @Test
- public void testDifferentColumnNames() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = "FLUME_JSON_TEST";
-
- String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
- + " (flume_time timestamp not null, col1
varchar , col2 double, col3 varchar[], col4 integer[]"
- + " CONSTRAINT pk PRIMARY KEY (flume_time))\n";
- String columns = "col1,col2,col3,col4";
- String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
- String columnsMapping =
"{\"col1\":\"col1\",\"col2\":\"f2\",\"col3\":\"f3\",\"col4\":\"col4\"}";
-
- initSinkContext(fullTableName, ddl, columns, columnsMapping,
rowkeyType, null);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
-
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- final String eventBody = "{\"col1\" : \"kalyan\", \"f2\" :
10.5, \"f3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
- final Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(1, rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- dropTable(fullTableName);
- }
-
- @Test
- public void testInnerColumns() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = "FLUME_JSON_TEST";
-
- String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
- + " (flume_time timestamp not null, col1
varchar , col2 double, col3 varchar[], col4 integer[]"
- + " CONSTRAINT pk PRIMARY KEY (flume_time))\n";
- String columns = "col1,col2,col3,col4";
- String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
- String columnsMapping =
"{\"col1\":\"col1\",\"col2\":\"x.y\",\"col3\":\"a.b1.c\",\"col4\":\"col4\"}";
-
- initSinkContext(fullTableName, ddl, columns, columnsMapping,
rowkeyType, null);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
-
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- final String eventBody = "{\"col1\" : \"kalyan\", \"x\" :
{\"y\" : 10.5}, \"a\" : {\"b1\" : {\"c\" : [\"abc\",\"pqr\",\"xyz\"] }, \"b2\"
: 111}, \"col4\" : [1,2,3,4]}";
- final Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(1, rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- dropTable(fullTableName);
- }
-
- @Test
- public void testInnerColumnsWithArrayMapping() throws
EventDeliveryException, SQLException {
-
- final String fullTableName = "FLUME_JSON_TEST";
-
- String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
- + " (flume_time timestamp not null, col1
varchar , col2 double, col3 varchar[], col4 integer[]"
- + " CONSTRAINT pk PRIMARY KEY (flume_time))\n";
- String columns = "col1,col2,col3,col4";
- String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
- String columnsMapping =
"{\"col1\":\"col1\",\"col2\":\"x.y\",\"col3\":\"a.b[*].c\",\"col4\":\"col4\"}";
-
- initSinkContext(fullTableName, ddl, columns, columnsMapping,
rowkeyType, null);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
-
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- final String eventBody = "{\"col1\" : \"kalyan\", \"x\" :
{\"y\" : 10.5}, \"a\" : {\"b\" : [{\"c\" : \"abc\"}, {\"c\" : \"pqr\"}, {\"c\"
: \"xyz\"}] , \"b2\" : 111}, \"col4\" : [1,2,3,4]}";
- final Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(1, rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- dropTable(fullTableName);
- }
-
- @Test
- public void testKeyGenerator() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = "FLUME_JSON_TEST";
- initSinkContextWithDefaults(fullTableName);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
-
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- final String eventBody = "{\"col1\" : \"kalyan\", \"col2\" :
10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
- final Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(1, rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- dropTable(fullTableName);
- }
-
- @Test
- public void testMismatchKeyGenerator() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = "FLUME_JSON_TEST";
- initSinkContextWithDefaults(fullTableName);
- setConfig(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
- DefaultKeyGenerator.UUID.name());
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- final String eventBody = "{\"col1\" : \"kalyan\", \"col2\" :
10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
- final Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- try {
- sink.process();
- fail();
- } catch (Exception ex) {
-
assertTrue(ex.getCause().getMessage().contains("java.lang.IllegalArgumentException:
Invalid format:"));
- }
-
- dropTable(fullTableName);
- }
-
- @Test
- public void testMissingColumnsInEvent() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = "FLUME_JSON_TEST";
- initSinkContextWithDefaults(fullTableName);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- final String eventBody = "{\"col1\" : \"kalyan\", \"col3\" :
[\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
- final Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(0, rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- dropTable(fullTableName);
- }
-
- @Test
- public void testBatchEvents() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = "FLUME_JSON_TEST";
- initSinkContextWithDefaults(fullTableName);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- int numEvents = 150;
- String col1 = "val1";
- String a1 = "[aaa,bbb,ccc]";
- String a2 = "[1,2,3,4]";
- String eventBody = null;
- List<Event> eventList = new ArrayList<>(numEvents);
- for (int i = 0; i < eventList.size(); i++) {
- eventBody = "{\"col1\" : \"" + (col1 + i) + "\",
\"col2\" : " + i * 10.5 + " , \"col3\" : " + a1
- + " , \"col4\" : " + a2 + "}";
- Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody));
- eventList.add(event);
- }
-
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- for (Event event : eventList) {
- channel.put(event);
- }
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(eventList.size(), rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- dropTable(fullTableName);
- }
-
- @Test
- public void testEventsWithHeaders() throws Exception {
-
- sinkContext = new Context();
- final String fullTableName = "FLUME_JSON_TEST";
- final String ddl = "CREATE TABLE IF NOT EXISTS "
- + fullTableName
- + " (rowkey VARCHAR not null, col1 varchar ,
col2 double, col3 varchar[], col4 integer[], host varchar , source varchar \n"
- + " CONSTRAINT pk PRIMARY KEY (rowkey))\n";
- String columns = "col1,col2,col3,col4";
- String columnsMapping =
"{\"col1\":\"col1\",\"col2\":\"col2\",\"col3\":\"col3\",\"col4\":\"col4\"}";
- String rowkeyType = DefaultKeyGenerator.UUID.name();
- String headers = "host,source";
- initSinkContext(fullTableName, ddl, columns, columnsMapping,
rowkeyType, headers);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
-
- int numEvents = 10;
- String col1 = "val1";
- String a1 = "[aaa,bbb,ccc]";
- String a2 = "[1,2,3,4]";
- String hostHeader = "host1";
- String sourceHeader = "source1";
- String eventBody = null;
- List<Event> eventList = new ArrayList<>(numEvents);
- for (int i = 0; i < numEvents; i++) {
- eventBody = "{\"col1\" : \"" + (col1 + i) + "\",
\"col2\" : " + i * 10.5 + " , \"col3\" : " + a1
- + " , \"col4\" : " + a2 + "}";
- Map<String, String> headerMap = new HashMap<>(2);
- headerMap.put("host", hostHeader);
- headerMap.put("source", sourceHeader);
- Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody), headerMap);
- eventList.add(event);
- }
-
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- for (Event event : eventList) {
- channel.put(event);
- }
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- final String query = " SELECT * FROM \n " + fullTableName;
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- final ResultSet rs;
- final Connection conn = DriverManager.getConnection(getUrl(),
props);
- try {
- rs = conn.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertEquals("host1", rs.getString("host"));
- assertEquals("source1", rs.getString("source"));
-
- assertTrue(rs.next());
- assertEquals("host1", rs.getString("host"));
- assertEquals("source1", rs.getString("source"));
- } finally {
- if (conn != null) {
- conn.close();
- }
- }
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- dropTable(fullTableName);
- }
-
- private Channel initChannel() {
- // Channel configuration
- Context channelContext = new Context();
- channelContext.put("capacity", "10000");
- channelContext.put("transactionCapacity", "200");
-
- Channel channel = new MemoryChannel();
- channel.setName("memorychannel");
- Configurables.configure(channel, channelContext);
- return channel;
- }
-
- private void initSinkContext(final String fullTableName, final String
ddl, final String columns,
- final String columnsMapping, final String rowkeyType,
final String headers) {
- if (fullTableName == null) {
- throw new NullPointerException();
- }
- sinkContext = new Context();
- sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
- sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,
EventSerializers.JSON.name());
- sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_COLUMN_NAMES, columns);
- if (null != columnsMapping)
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX
+ FlumeConstants.CONFIG_COLUMNS_MAPPING,
- columnsMapping);
- if (null != rowkeyType)
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX
+ FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
- rowkeyType);
- if (null != headers)
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX
+ FlumeConstants.CONFIG_HEADER_NAMES, headers);
- }
-
- private void initSinkContextWithDefaults(final String fullTableName) {
- String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
- + " (flume_time timestamp not null, col1
varchar , col2 double, col3 varchar[], col4 integer[]"
- + " CONSTRAINT pk PRIMARY KEY (flume_time))\n";
- String columns = "col1,col2,col3,col4";
- String columnsMapping =
"{\"col1\":\"col1\",\"col2\":\"col2\",\"col3\":\"col3\",\"col4\":\"col4\"}";
- String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
- initSinkContext(fullTableName, ddl, columns, columnsMapping,
rowkeyType, null);
- }
-
- private void setConfig(final String configName, final String
configValue) {
- if (sinkContext == null){
- throw new NullPointerException();
- }
- if (configName == null){
- throw new NullPointerException();
- }
- if (configValue == null){
- throw new NullPointerException();
- }
- sinkContext.put(configName, configValue);
- }
-
- private int countRows(final String fullTableName) throws SQLException {
- if (fullTableName == null){
- throw new NullPointerException();
- }
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- final Connection conn = DriverManager.getConnection(getUrl(),
props);
- ResultSet rs = null;
- try {
- rs = conn.createStatement().executeQuery("select
count(*) from " + fullTableName);
- int rowsCount = 0;
- while (rs.next()) {
- rowsCount = rs.getInt(1);
- }
- return rowsCount;
-
- } finally {
- if (rs != null) {
- rs.close();
- }
- if (conn != null) {
- conn.close();
- }
- }
-
- }
-
- private void dropTable(final String fullTableName) throws SQLException {
- if (fullTableName == null){
- throw new NullPointerException();
- }
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- final Connection conn = DriverManager.getConnection(getUrl(),
props);
- try {
- conn.createStatement().execute("drop table if exists "
+ fullTableName);
- } finally {
- if (conn != null) {
- conn.close();
- }
- }
- }
-
-}
diff --git
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java
b/phoenix5-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java
deleted file mode 100644
index b9fe978..0000000
--- a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.util.Properties;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Sink;
-import org.apache.flume.SinkFactory;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.flume.sink.DefaultSinkFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.flume.serializer.CustomSerializer;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.apache.phoenix.flume.sink.NullPhoenixSink;
-import org.apache.phoenix.flume.sink.PhoenixSink;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.TestUtil;
-import org.apache.phoenix.query.BaseTest;
-import org.junit.Assert;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class PhoenixSinkIT extends BaseTest {
-
- private Context sinkContext;
- private PhoenixSink sink;
-
- @BeforeClass
- public static synchronized void doSetup() throws Exception {
- setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
- }
-
- @AfterClass
- public static synchronized void doTeardown() throws Exception {
- dropNonSystemTables();
- }
-
- @After
- public void cleanUpAfterTest() throws Exception {
- deletePriorMetaData(HConstants.LATEST_TIMESTAMP, getUrl());
- }
-
- @Test
- public void testSinkCreation() {
- SinkFactory factory = new DefaultSinkFactory ();
- Sink sink = factory.create("PhoenixSink__",
"org.apache.phoenix.flume.sink.PhoenixSink");
- Assert.assertNotNull(sink);
- Assert.assertTrue(PhoenixSink.class.isInstance(sink));
- }
- @Test
- public void testConfiguration () {
-
- sinkContext = new Context ();
- sinkContext.put(FlumeConstants.CONFIG_TABLE, "test");
- sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- }
-
-
-
- @Test(expected = NullPointerException.class)
- public void testInvalidConfiguration () {
-
- sinkContext = new Context ();
- sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- }
-
- @Test(expected=RuntimeException.class)
- public void testInvalidConfigurationOfSerializer () {
-
- sinkContext = new Context ();
- sinkContext.put(FlumeConstants.CONFIG_TABLE, "test");
- sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,"unknown");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- }
-
- @Test
- public void testInvalidTable() {
- sinkContext = new Context ();
- sinkContext.put(FlumeConstants.CONFIG_TABLE, "flume_test");
- sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,
EventSerializers.REGEX.name());
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_COLUMN_NAMES, "col1,col2");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
- try {
- sink.start();
- fail();
- }catch(Exception e) {
- assertTrue(e.getMessage(), e.getMessage().contains("ERROR 1012
(42M03): Table undefined."));
- }
- }
-
- @Test
- public void testSinkLifecycle () {
- String tableName = generateUniqueName();
-
- String ddl = "CREATE TABLE " + tableName +
- " (flume_time timestamp not null, col1 varchar , col2
varchar" +
- " CONSTRAINT pk PRIMARY KEY (flume_time))\n";
-
- sinkContext = new Context ();
- sinkContext.put(FlumeConstants.CONFIG_TABLE, tableName);
- sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
- sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_REGULAR_EXPRESSION,"^([^\t]+)\t([^\t]+)$");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
-
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- Assert.assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- Assert.assertEquals(LifecycleState.START, sink.getLifecycleState());
- sink.stop();
- Assert.assertEquals(LifecycleState.STOP, sink.getLifecycleState());
- }
-
- @Test
- public void testCreateTable () throws Exception {
- String tableName = generateUniqueName();
- String ddl = "CREATE TABLE " + tableName + " " +
- " (flume_time timestamp not null, col1 varchar , col2
varchar" +
- " CONSTRAINT pk PRIMARY KEY (flume_time))\n";
-
- final String fullTableName = tableName;
- sinkContext = new Context ();
- sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
- sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
- sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_REGULAR_EXPRESSION,"^([^\t]+)\t([^\t]+)$");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
DefaultKeyGenerator.TIMESTAMP.name());
-
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- Assert.assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- try {
- Assert.assertTrue(driver.getConnectionQueryServices(getUrl(),
TestUtil.TEST_PROPERTIES).getAdmin()
- .tableExists(TableName.valueOf(fullTableName)));
- } finally {
- admin.close();
- }
- }
-
- @Test
- public void testExtendedSink() throws Exception {
- // Create a mock NullPhoenixSink which extends PhoenixSink, and verify
configure is invoked()
-
- PhoenixSink sink = mock(NullPhoenixSink.class);
- sinkContext = new Context();
- sinkContext.put(FlumeConstants.CONFIG_TABLE, "FLUME_TEST_EXTENDED");
- sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,
CustomSerializer.class.getName());
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_COLUMN_NAMES, "ID, COUNTS");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
DefaultKeyGenerator.TIMESTAMP.name());
-
- Configurables.configure(sink, sinkContext);
- verify(sink).configure(sinkContext);
- }
-
- @Test
- public void testExtendedSerializer() throws Exception {
- /*
- Sadly, we can't mock a serializer, as the PhoenixSink does a
Class.forName() to instantiate
- it. Instead. we'll setup a Flume channel and verify the data our
custom serializer wrote.
- */
-
- final String fullTableName = "FLUME_TEST_EXTENDED";
- final String ddl = "CREATE TABLE " + fullTableName + " (ID BIGINT NOT
NULL PRIMARY KEY, COUNTS UNSIGNED_LONG)";
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- final Connection conn = DriverManager.getConnection(getUrl(), props);
- conn.createStatement().execute(ddl);
- conn.commit();
-
- sinkContext = new Context();
- sinkContext.put(FlumeConstants.CONFIG_TABLE, "FLUME_TEST_EXTENDED");
- sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,
CustomSerializer.class.getName());
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_COLUMN_NAMES, "ID, COUNTS");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
DefaultKeyGenerator.TIMESTAMP.name());
-
- PhoenixSink sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
-
- // Send a test event through Flume, using our custom serializer
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
- sink.start();
-
- final Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(EventBuilder.withBody(Bytes.toBytes("test event")));
- transaction.commit();
- transaction.close();
-
- sink.process();
- sink.stop();
-
- // Verify our serializer wrote out data
- ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM
FLUME_TEST_EXTENDED");
- assertTrue(rs.next());
- assertTrue(rs.getLong(1) == 1L);
- }
-
- private Channel initChannel() {
- //Channel configuration
- Context channelContext = new Context();
- channelContext.put("capacity", "10000");
- channelContext.put("transactionCapacity", "200");
-
- Channel channel = new MemoryChannel();
- channel.setName("memorychannel");
- Configurables.configure(channel, channelContext);
- return channel;
- }
-
-
-}
diff --git
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/RegexEventSerializerIT.java
b/phoenix5-flume/src/it/java/org/apache/phoenix/flume/RegexEventSerializerIT.java
deleted file mode 100644
index b849f10..0000000
---
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/RegexEventSerializerIT.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.apache.phoenix.flume.sink.PhoenixSink;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.query.BaseTest;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class RegexEventSerializerIT extends BaseTest {
-
- private Context sinkContext;
- private PhoenixSink sink;
-
- @BeforeClass
- public static synchronized void doSetup() throws Exception {
- setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
- }
-
- @AfterClass
- public static synchronized void doTeardown() throws Exception {
- dropNonSystemTables();
- }
-
- @After
- public void cleanUpAfterTest() throws Exception {
- deletePriorMetaData(HConstants.LATEST_TIMESTAMP, getUrl());
- }
-
- @Test
- public void testKeyGenerator() throws EventDeliveryException, SQLException
{
-
- final String fullTableName = generateUniqueName();
- initSinkContextWithDefaults(fullTableName);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
-
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- final String eventBody = "val1\tval2";
- final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(1 , rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- }
-
-
- @Test
- public void testMismatchKeyGenerator() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = generateUniqueName();
- initSinkContextWithDefaults(fullTableName);
- setConfig(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.UUID.name());
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- final String eventBody = "val1\tval2";
- final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- try {
- sink.process();
- fail();
- }catch(Exception ex){
-
assertTrue(ex.getCause().getMessage().contains("java.lang.IllegalArgumentException:
Invalid format:"));
- }
- }
-
- @Test
- public void testMissingColumnsInEvent() throws EventDeliveryException,
SQLException {
-
- final String fullTableName = generateUniqueName();
- initSinkContextWithDefaults(fullTableName);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- final String eventBody = "val1";
- final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event);
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(0 , rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- }
-
- @Test
- public void testBatchEvents() throws EventDeliveryException, SQLException {
-
- final String fullTableName = generateUniqueName();
- initSinkContextWithDefaults(fullTableName);
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
- int numEvents = 150;
- String col1 = "val1";
- String col2 = "val2";
- String eventBody = null;
- List<Event> eventList = new ArrayList<>(numEvents);
- for(int i = 0 ; i < eventList.size() ; i++) {
- eventBody = (col1 + i) + "\t" + (col2 + i);
- Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
- eventList.add(event);
- }
-
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- for(Event event : eventList) {
- channel.put(event);
- }
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- int rowsInDb = countRows(fullTableName);
- assertEquals(eventList.size(), rowsInDb);
-
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- }
-
- @Test
- public void testApacheLogRegex() throws Exception {
-
- sinkContext = new Context ();
- final String fullTableName = generateUniqueName();
- final String logRegex = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\])
\"([^ ]+) ([^ ]+)" +
- " ([^\"]+)\" (-|[0-9]*) (-|[0-9]*)(?: ([^
\"]*|\"[^\"]*\")" +
- " ([^ \"]*|\"[^\"]*\"))?";
-
- final String columns =
"host,identity,user,time,method,request,protocol,status,size,referer,agent";
-
- String ddl = "CREATE TABLE " + fullTableName +
- " (uid VARCHAR NOT NULL, user VARCHAR, time varchar, host
varchar , identity varchar, method varchar, request varchar , protocol
varchar," +
- " status integer , size integer , referer varchar , agent
varchar CONSTRAINT pk PRIMARY KEY (uid))\n";
-
- sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
- sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
- sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_REGULAR_EXPRESSION,logRegex);
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_COLUMN_NAMES,columns);
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.UUID.name());
-
- String message1 = "33.22.11.00 - user1 [12/Dec/2013:07:01:19 +0000] " +
- "\"GET /wp-admin/css/install.css HTTP/1.0\" 200 813 " +
- "\"http://www.google.com\" \"Mozilla/5.0 (comp" +
- "atible; Yahoo! Slurp;
http://help.yahoo.com/help/us/ysearch/slurp)\"";
-
- String message2 = "192.168.20.1 - user2 [13/Dec/2013:06:05:19 +0000] "
+
- "\"GET /wp-admin/css/install.css HTTP/1.0\" 400 363 " +
- "\"http://www.salesforce.com/in/?ir=1\" \"Mozilla/5.0 (comp" +
- "atible;)\"";
-
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
-
- final Event event1 = EventBuilder.withBody(Bytes.toBytes(message1));
- final Event event2 = EventBuilder.withBody(Bytes.toBytes(message2));
-
- final Transaction transaction = channel.getTransaction();
- transaction.begin();
- channel.put(event1);
- channel.put(event2);
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- final String query = " SELECT * FROM \n " + fullTableName;
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- final ResultSet rs ;
- final Connection conn = DriverManager.getConnection(getUrl(), props);
- try{
- rs = conn.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertTrue(rs.next());
-
- }finally {
- if(conn != null) {
- conn.close();
- }
- }
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- }
-
-
- @Test
- public void testEventsWithHeaders() throws Exception {
-
- sinkContext = new Context ();
- final String fullTableName = generateUniqueName();
- final String ddl = "CREATE TABLE " + fullTableName +
- " (rowkey VARCHAR not null, col1 varchar , cf1.col2 varchar ,
host varchar , source varchar \n" +
- " CONSTRAINT pk PRIMARY KEY (rowkey))\n";
-
- sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
- sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
- sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_REGULAR_EXPRESSION,"^([^\t]+)\t([^\t]+)$");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,cf1.col2");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_HEADER_NAMES,"host,source");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.UUID.name());
-
- sink = new PhoenixSink();
- Configurables.configure(sink, sinkContext);
- assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
- final Channel channel = this.initChannel();
- sink.setChannel(channel);
-
- sink.start();
-
- int numEvents = 10;
- String col1 = "val1";
- String col2 = "val2";
- String hostHeader = "host1";
- String sourceHeader = "source1";
- String eventBody = null;
- List<Event> eventList = new ArrayList<>(numEvents);
- for(int i = 0 ; i < numEvents ; i++) {
- eventBody = (col1 + i) + "\t" + (col2 + i);
- Map<String, String> headerMap = new HashMap<>(2);
- headerMap.put("host",hostHeader);
- headerMap.put("source",sourceHeader);
- Event event =
EventBuilder.withBody(Bytes.toBytes(eventBody),headerMap);
- eventList.add(event);
- }
-
- // put event in channel
- Transaction transaction = channel.getTransaction();
- transaction.begin();
- for(Event event : eventList) {
- channel.put(event);
- }
- transaction.commit();
- transaction.close();
-
- sink.process();
-
- final String query = " SELECT * FROM \n " + fullTableName;
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- final ResultSet rs ;
- final Connection conn = DriverManager.getConnection(getUrl(), props);
- try{
- rs = conn.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertEquals("host1",rs.getString("host"));
- assertEquals("source1",rs.getString("source"));
-
- assertTrue(rs.next());
- assertEquals("host1",rs.getString("host"));
- assertEquals("source1",rs.getString("source"));
- }finally {
- if(conn != null) {
- conn.close();
- }
- }
- sink.stop();
- assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
- }
-
- private Channel initChannel() {
- //Channel configuration
- Context channelContext = new Context();
- channelContext.put("capacity", "10000");
- channelContext.put("transactionCapacity", "200");
-
- Channel channel = new MemoryChannel();
- channel.setName("memorychannel");
- Configurables.configure(channel, channelContext);
- return channel;
- }
-
- private void initSinkContextWithDefaults(final String fullTableName) {
- if (fullTableName == null){
- throw new NullPointerException();
- }
- sinkContext = new Context ();
- String ddl = "CREATE TABLE " + fullTableName +
- " (flume_time timestamp not null, col1 varchar , col2
varchar" +
- " CONSTRAINT pk PRIMARY KEY (flume_time))\n";
-
- sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
- sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
- sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_REGULAR_EXPRESSION,"^([^\t]+)\t([^\t]+)$");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
- sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX +
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
-
- }
-
- private void setConfig(final String configName , final String configValue)
{
- if (sinkContext == null){
- throw new NullPointerException();
- }
- if (configName == null){
- throw new NullPointerException();
- }
- if (configValue == null){
- throw new NullPointerException();
- }
- sinkContext.put(configName, configValue);
- }
-
- private int countRows(final String fullTableName) throws SQLException {
- if (fullTableName == null){
- throw new NullPointerException();
- }
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- final Connection conn = DriverManager.getConnection(getUrl(), props);
- ResultSet rs = null ;
- try{
- rs = conn.createStatement().executeQuery("select count(*) from
"+fullTableName);
- int rowsCount = 0;
- while(rs.next()) {
- rowsCount = rs.getInt(1);
- }
- return rowsCount;
-
- } finally {
- if(rs != null) {
- rs.close();
- }
- if(conn != null) {
- conn.close();
- }
- }
-
-
- }
-
-}
diff --git
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java
b/phoenix5-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java
deleted file mode 100644
index 5db5fa6..0000000
---
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-
-public class CustomSerializer extends BaseEventSerializer {
- @Override
- public void doConfigure(Context context) {
-
- }
-
- @Override
- public void doInitialize() throws SQLException {
-
- }
-
- @Override
- public void upsertEvents(List<Event> events) throws SQLException {
- // Just execute a sample UPSERT
- connection.createStatement().execute("UPSERT INTO
FLUME_TEST_EXTENDED(ID, COUNTS) VALUES(1, 1)");
- connection.commit();
- }
-}
diff --git
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java
b/phoenix5-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java
deleted file mode 100644
index 1df52e1..0000000
---
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.sink;
-
-public class NullPhoenixSink extends PhoenixSink {
-}
diff --git
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/DefaultKeyGenerator.java
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/DefaultKeyGenerator.java
deleted file mode 100644
index 3820c2a..0000000
---
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/DefaultKeyGenerator.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.Random;
-
-import org.apache.phoenix.util.DateUtil;
-
-public enum DefaultKeyGenerator implements KeyGenerator {
-
- UUID {
-
- @Override
- public String generate() {
- return String.valueOf(java.util.UUID.randomUUID());
- }
-
- },
- TIMESTAMP {
-
- @Override
- public String generate() {
- java.sql.Timestamp ts = new Timestamp(System.currentTimeMillis());
- return DateUtil.DEFAULT_DATE_FORMATTER.format(ts);
- }
-
- },
- DATE {
-
- @Override
- public String generate() {
- Date dt = new Date(System.currentTimeMillis());
- return DateUtil.DEFAULT_DATE_FORMATTER.format(dt);
- }
- },
- RANDOM {
-
- @Override
- public String generate() {
- return String.valueOf(new Random().nextLong());
- }
-
- },
- NANOTIMESTAMP {
-
- @Override
- public String generate() {
- return String.valueOf(System.nanoTime());
- }
-
- };
-}
diff --git
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
deleted file mode 100644
index a146bbe..0000000
--- a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-public final class FlumeConstants {
-
- /**
- * The Hbase table which the sink should write to.
- */
- public static final String CONFIG_TABLE = "table";
- /**
- * The ddl query for the Hbase table where events are ingested to.
- */
- public static final String CONFIG_TABLE_DDL = "ddl";
- /**
- * Maximum number of events the sink should take from the channel per
transaction, if available.
- */
- public static final String CONFIG_BATCHSIZE = "batchSize";
- /**
- * The fully qualified class name of the serializer the sink should use.
- */
- public static final String CONFIG_SERIALIZER = "serializer";
- /**
- * Configuration to pass to the serializer.
- */
- public static final String CONFIG_SERIALIZER_PREFIX = CONFIG_SERIALIZER +
".";
-
- /**
- * Configuration for the zookeeper quorum.
- */
- public static final String CONFIG_ZK_QUORUM = "zookeeperQuorum";
-
- /**
- * Configuration for the jdbc url.
- */
- public static final String CONFIG_JDBC_URL = "jdbcUrl";
-
- /**
- * Default batch size .
- */
- public static final Integer DEFAULT_BATCH_SIZE = 100;
-
- /** Regular expression used to parse groups from event data. */
- public static final String CONFIG_REGULAR_EXPRESSION = "regex";
- public static final String REGEX_DEFAULT = "(.*)";
-
- /** Whether to ignore case when performing regex matches. */
- public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase";
- public static final boolean IGNORE_CASE_DEFAULT = false;
-
- /** JSON expression used to parse groups from event data. */
- public static final String CONFIG_COLUMNS_MAPPING = "columnsMapping";
- public static final String CONFIG_PARTIAL_SCHEMA = "partialSchema";
- public static final String JSON_DEFAULT = "{}";
-
- /** CSV expression used to parse groups from event data. */
- public static final String CSV_DELIMITER = "csvDelimiter";
- public static final String CSV_DELIMITER_DEFAULT = ",";
- public static final String CSV_QUOTE = "csvQuote";
- public static final String CSV_QUOTE_DEFAULT = "\"";
- public static final String CSV_ESCAPE = "csvEscape";
- public static final String CSV_ESCAPE_DEFAULT = "\\";
- public static final String CSV_ARRAY_DELIMITER = "csvArrayDelimiter";
- public static final String CSV_ARRAY_DELIMITER_DEFAULT = ",";
-
- /** Comma separated list of column names . */
- public static final String CONFIG_COLUMN_NAMES = "columns";
-
- /** The header columns to persist as columns into the default column
family. */
- public static final String CONFIG_HEADER_NAMES = "headers";
-
- /** The rowkey type generator . */
- public static final String CONFIG_ROWKEY_TYPE_GENERATOR = "rowkeyType";
-
- /**
- * The default delimiter for columns and headers
- */
- public static final String DEFAULT_COLUMNS_DELIMITER = ",";
-}
diff --git
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/KeyGenerator.java
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/KeyGenerator.java
deleted file mode 100644
index d823a56..0000000
--- a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/KeyGenerator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-public interface KeyGenerator {
-
- public String generate();
-}
-
diff --git
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/SchemaHandler.java
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/SchemaHandler.java
deleted file mode 100644
index 206b3e5..0000000
--- a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/SchemaHandler.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SchemaHandler {
-
- private static final Logger logger =
LoggerFactory.getLogger(SchemaHandler.class);
-
- public static boolean createTable(Connection connection, String
createTableDdl) {
- if (connection == null) {
- throw new NullPointerException();
- }
- if (createTableDdl == null) {
- throw new NullPointerException();
- }
- boolean status = true;
- try {
- status = connection.createStatement().execute(createTableDdl);
- } catch (SQLException e) {
- logger.error("An error occurred during executing the create table
ddl {} ",createTableDdl);
- throw new RuntimeException(e);
- }
- return status;
-
- }
-
-}
diff --git
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
deleted file mode 100644
index 01836a9..0000000
---
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-import static org.apache.phoenix.flume.FlumeConstants.CONFIG_COLUMN_NAMES;
-import static org.apache.phoenix.flume.FlumeConstants.CONFIG_HEADER_NAMES;
-import static
org.apache.phoenix.flume.FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR;
-import static
org.apache.phoenix.flume.FlumeConstants.DEFAULT_COLUMNS_DELIMITER;
-import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.flume.Context;
-import org.apache.flume.conf.ComponentConfiguration;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.flume.DefaultKeyGenerator;
-import org.apache.phoenix.flume.FlumeConstants;
-import org.apache.phoenix.flume.KeyGenerator;
-import org.apache.phoenix.flume.SchemaHandler;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.StringUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public abstract class BaseEventSerializer implements EventSerializer {
-
- private static final Logger logger =
LoggerFactory.getLogger(BaseEventSerializer.class);
-
- protected Connection connection;
- protected String fullTableName;
- protected ColumnInfo[] columnMetadata;
- protected boolean autoGenerateKey = false;
- protected KeyGenerator keyGenerator;
- protected List<String> colNames = new ArrayList<>(10);
- protected List<String> headers = new ArrayList<>(5);
- protected String upsertStatement;
- private String jdbcUrl;
- private Integer batchSize;
- private String createTableDdl;
-
-
-
-
-
- @Override
- public void configure(Context context) {
-
- this.createTableDdl =
context.getString(FlumeConstants.CONFIG_TABLE_DDL);
- this.fullTableName = context.getString(FlumeConstants.CONFIG_TABLE);
- final String zookeeperQuorum =
context.getString(FlumeConstants.CONFIG_ZK_QUORUM);
- final String ipJdbcURL =
context.getString(FlumeConstants.CONFIG_JDBC_URL);
- this.batchSize = context.getInteger(FlumeConstants.CONFIG_BATCHSIZE,
FlumeConstants.DEFAULT_BATCH_SIZE);
- final String columnNames = context.getString(CONFIG_COLUMN_NAMES);
- final String headersStr = context.getString(CONFIG_HEADER_NAMES);
- final String keyGeneratorType =
context.getString(CONFIG_ROWKEY_TYPE_GENERATOR);
-
- if (this.fullTableName == null) {
- throw new NullPointerException(
- "Table name cannot be empty, please specify in the
configuration file");
- }
- if(zookeeperQuorum != null && !zookeeperQuorum.isEmpty()) {
- this.jdbcUrl = QueryUtil.getUrl(zookeeperQuorum);
- }
- if(ipJdbcURL != null && !ipJdbcURL.isEmpty()) {
- this.jdbcUrl = ipJdbcURL;
- }
- if (this.jdbcUrl == null) {
- throw new NullPointerException(
- "Please specify either the zookeeper quorum or the jdbc url in
the configuration file");
- }
- if (columnNames == null) {
- throw new NullPointerException(
- "Column names cannot be empty, please specify in configuration
file");
- }
-
colNames.addAll(Arrays.asList(columnNames.split(DEFAULT_COLUMNS_DELIMITER)));
-
- if(headersStr != null && !headersStr.isEmpty()) {
-
headers.addAll(Arrays.asList(headersStr.split(DEFAULT_COLUMNS_DELIMITER)));
- }
-
- if(keyGeneratorType != null && !keyGeneratorType.isEmpty()) {
- try {
- keyGenerator =
DefaultKeyGenerator.valueOf(keyGeneratorType.toUpperCase());
- this.autoGenerateKey = true;
- } catch(IllegalArgumentException iae) {
- logger.error("An invalid key generator {} was specified in
configuration file. Specify one of
{}",keyGeneratorType,DefaultKeyGenerator.values());
- throw new RuntimeException(iae);
- }
- }
-
- logger.debug(" the jdbcUrl configured is {}",jdbcUrl);
- logger.debug(" columns configured are {}",colNames.toString());
- logger.debug(" headers configured are {}",headersStr);
- logger.debug(" the keyGenerator configured is {} ",keyGeneratorType);
-
- doConfigure(context);
-
- }
-
- @Override
- public void configure(ComponentConfiguration conf) {
- // NO-OP
-
- }
-
-
- @Override
- public void initialize() throws SQLException {
- final Properties props = new Properties();
- props.setProperty(UPSERT_BATCH_SIZE_ATTRIB,
String.valueOf(this.batchSize));
- ResultSet rs = null;
- try {
- this.connection = DriverManager.getConnection(this.jdbcUrl, props);
- this.connection.setAutoCommit(false);
- if(this.createTableDdl != null) {
- SchemaHandler.createTable(connection,createTableDdl);
- }
-
-
- final Map<String,Integer> qualifiedColumnMap = new
LinkedHashMap<>();
- final Map<String,Integer> unqualifiedColumnMap = new
LinkedHashMap<>();
- final String schemaName =
SchemaUtil.getSchemaNameFromFullName(fullTableName);
- final String tableName =
SchemaUtil.getTableNameFromFullName(fullTableName);
-
- String rowkey = null;
- String cq = null;
- String cf = null;
- Integer dt = null;
- rs = connection.getMetaData().getColumns("",
StringUtil.escapeLike(SchemaUtil.normalizeIdentifier(schemaName)),
StringUtil.escapeLike(SchemaUtil.normalizeIdentifier(tableName)), null);
- while (rs.next()) {
- cf = rs.getString(QueryUtil.COLUMN_FAMILY_POSITION);
- cq = rs.getString(QueryUtil.COLUMN_NAME_POSITION);
- // TODO: Fix this .. change `DATA_TYPE_POSITION` value 5 to 26
- // dt = rs.getInt(QueryUtil.DATA_TYPE_POSITION);
- dt = rs.getInt(26);
- if(cf == null || cf.isEmpty()) {
- rowkey = cq; // this is required only when row key is auto
generated
- } else {
- qualifiedColumnMap.put(SchemaUtil.getColumnDisplayName(cf,
cq), dt);
- }
- unqualifiedColumnMap.put(SchemaUtil.getColumnDisplayName(null,
cq), dt);
- }
-
- //can happen when table not found in Hbase.
- if(unqualifiedColumnMap.isEmpty()) {
- throw new
SQLExceptionInfo.Builder(SQLExceptionCode.TABLE_UNDEFINED)
- .setTableName(tableName).build().buildException();
- }
-
- int colSize = colNames.size();
- int headersSize = headers.size();
- int totalSize = colSize + headersSize + ( autoGenerateKey ? 1 : 0);
- columnMetadata = new ColumnInfo[totalSize] ;
-
- int position = 0;
- position = this.addToColumnMetadataInfo(colNames,
qualifiedColumnMap, unqualifiedColumnMap, position);
- position = this.addToColumnMetadataInfo(headers,
qualifiedColumnMap, unqualifiedColumnMap, position);
-
- if(autoGenerateKey) {
- Integer sqlType = unqualifiedColumnMap.get(rowkey);
- if (sqlType == null) {
- throw new
SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_MISSING)
-
.setColumnName(rowkey).setTableName(fullTableName).build().buildException();
- }
- columnMetadata[position] = new ColumnInfo(rowkey, sqlType);
- position++;
- }
-
- this.upsertStatement =
QueryUtil.constructUpsertStatement(fullTableName,
Arrays.asList(columnMetadata));
- logger.info(" the upsert statement is {} " ,this.upsertStatement);
-
- } catch (SQLException e) {
- logger.error("error {} occurred during initializing connection
",e.getMessage());
- throw e;
- } finally {
- if(rs != null) {
- rs.close();
- }
- }
- doInitialize();
- }
-
- private int addToColumnMetadataInfo(final List<String> columns , final
Map<String,Integer> qualifiedColumnsInfoMap, Map<String, Integer>
unqualifiedColumnsInfoMap, int position) throws SQLException {
- if (columns == null) {
- throw new NullPointerException();
- }
- if (qualifiedColumnsInfoMap == null) {
- throw new NullPointerException();
- }
- if (unqualifiedColumnsInfoMap == null) {
- throw new NullPointerException();
- }
- for (int i = 0 ; i < columns.size() ; i++) {
- String columnName =
SchemaUtil.normalizeIdentifier(columns.get(i).trim());
- Integer sqlType = unqualifiedColumnsInfoMap.get(columnName);
- if (sqlType == null) {
- sqlType = qualifiedColumnsInfoMap.get(columnName);
- if (sqlType == null) {
- throw new
SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_NOT_FOUND)
-
.setColumnName(columnName).setTableName(this.fullTableName).build().buildException();
- }
- }
- columnMetadata[position] = new ColumnInfo(columnName, sqlType);
- position++;
- }
- return position;
- }
-
- public abstract void doConfigure(Context context);
-
- public abstract void doInitialize() throws SQLException;
-
-
- @Override
- public void close() {
- if(connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- logger.error(" Error while closing connection {} ");
- }
- }
- }
-}
diff --git
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
deleted file mode 100644
index 760b9c7..0000000
---
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-import static org.apache.phoenix.flume.FlumeConstants.CSV_DELIMITER;
-import static org.apache.phoenix.flume.FlumeConstants.CSV_DELIMITER_DEFAULT;
-import static org.apache.phoenix.flume.FlumeConstants.CSV_QUOTE;
-import static org.apache.phoenix.flume.FlumeConstants.CSV_QUOTE_DEFAULT;
-import static org.apache.phoenix.flume.FlumeConstants.CSV_ESCAPE;
-import static org.apache.phoenix.flume.FlumeConstants.CSV_ESCAPE_DEFAULT;
-import static org.apache.phoenix.flume.FlumeConstants.CSV_ARRAY_DELIMITER;
-import static
org.apache.phoenix.flume.FlumeConstants.CSV_ARRAY_DELIMITER_DEFAULT;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.sql.Array;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.phoenix.schema.types.PDataType;
-import org.json.JSONArray;
-import org.json.JSONTokener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-
-public class CsvEventSerializer extends BaseEventSerializer {
-
- private static final Logger logger =
LoggerFactory.getLogger(CsvEventSerializer.class);
-
- private String csvDelimiter;
- private String csvQuote;
- private String csvEscape;
- private String csvArrayDelimiter;
- private CsvLineParser csvLineParser;
-
- /**
- *
- */
- @Override
- public void doConfigure(Context context) {
- csvDelimiter = context.getString(CSV_DELIMITER,
CSV_DELIMITER_DEFAULT);
- csvQuote = context.getString(CSV_QUOTE, CSV_QUOTE_DEFAULT);
- csvEscape = context.getString(CSV_ESCAPE, CSV_ESCAPE_DEFAULT);
- csvArrayDelimiter = context.getString(CSV_ARRAY_DELIMITER,
CSV_ARRAY_DELIMITER_DEFAULT);
- csvLineParser = new
CsvLineParser(csvDelimiter.toCharArray()[0], csvQuote.toCharArray()[0],
- csvEscape.toCharArray()[0]);
- }
-
- /**
- *
- */
- @Override
- public void doInitialize() throws SQLException {
- // NO-OP
- }
-
- @Override
- public void upsertEvents(List<Event> events) throws SQLException {
- if(events == null){
- throw new NullPointerException();
- }
- if(connection == null){
- throw new NullPointerException();
- }
- if(this.upsertStatement == null){
- throw new NullPointerException();
- }
-
- boolean wasAutoCommit = connection.getAutoCommit();
- connection.setAutoCommit(false);
- try (PreparedStatement colUpsert =
connection.prepareStatement(upsertStatement)) {
- String value = null;
- Integer sqlType = null;
- for (Event event : events) {
- byte[] payloadBytes = event.getBody();
- if (payloadBytes == null || payloadBytes.length
== 0) {
- continue;
- }
- String payload = new String(payloadBytes);
- CSVRecord csvRecord =
csvLineParser.parse(payload);
- if (colNames.size() != csvRecord.size()) {
- logger.debug("payload data {} doesn't
match the fields mapping {} ", payload, colNames);
- continue;
- }
- Map<String, String> data = new HashMap<String,
String>();
- for (int i = 0; i < csvRecord.size(); i++) {
- data.put(colNames.get(i),
csvRecord.get(i));
- }
- Collection<String> values = data.values();
- if (values.contains(null)) {
- logger.debug("payload data {} doesn't
match the fields mapping {} ", payload, colNames);
- continue;
- }
-
- int index = 1;
- int offset = 0;
- for (int i = 0; i < colNames.size(); i++,
offset++) {
- if (columnMetadata[offset] == null) {
- continue;
- }
- String colName = colNames.get(i);
- value = data.get(colName);
- sqlType =
columnMetadata[offset].getSqlType();
- PDataType pDataType =
PDataType.fromTypeId(sqlType);
- Object upsertValue;
- if (pDataType.isArrayType()) {
- String arrayJson =
Arrays.toString(value.split(csvArrayDelimiter));
- JSONArray jsonArray = new
JSONArray(new JSONTokener(arrayJson));
- Object[] vals = new
Object[jsonArray.length()];
- for (int x = 0; x <
jsonArray.length(); x++) {
- vals[x] =
jsonArray.get(x);
- }
- String baseTypeSqlName =
PDataType.arrayBaseType(pDataType).getSqlTypeName();
- Array array =
connection.createArrayOf(baseTypeSqlName, vals);
- upsertValue =
pDataType.toObject(array, pDataType);
- } else {
- upsertValue =
pDataType.toObject(value);
- }
- if (upsertValue != null) {
- colUpsert.setObject(index++,
upsertValue, sqlType);
- } else {
- colUpsert.setNull(index++,
sqlType);
- }
- }
-
- // add headers if necessary
- Map<String, String> headerValues =
event.getHeaders();
- for (int i = 0; i < headers.size(); i++,
offset++) {
- String headerName = headers.get(i);
- String headerValue =
headerValues.get(headerName);
- sqlType =
columnMetadata[offset].getSqlType();
- Object upsertValue =
PDataType.fromTypeId(sqlType).toObject(headerValue);
- if (upsertValue != null) {
- colUpsert.setObject(index++,
upsertValue, sqlType);
- } else {
- colUpsert.setNull(index++,
sqlType);
- }
- }
-
- if (autoGenerateKey) {
- sqlType =
columnMetadata[offset].getSqlType();
- String generatedRowValue =
this.keyGenerator.generate();
- Object rowkeyValue =
PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
- colUpsert.setObject(index++,
rowkeyValue, sqlType);
- }
- colUpsert.execute();
- }
- connection.commit();
- } catch (Exception ex) {
- logger.error("An error {} occurred during persisting
the event ", ex.getMessage());
- throw new SQLException(ex.getMessage());
- } finally {
- if (wasAutoCommit) {
- connection.setAutoCommit(true);
- }
- }
-
- }
-
- static class CsvLineParser {
- private final CSVFormat csvFormat;
-
- CsvLineParser(char fieldDelimiter, char quote, char escape) {
- this.csvFormat =
CSVFormat.DEFAULT.withIgnoreEmptyLines(true).withDelimiter(fieldDelimiter)
- .withEscape(escape).withQuote(quote);
- }
-
- public CSVRecord parse(String input) throws IOException {
- CSVParser csvParser = new CSVParser(new
StringReader(input), this.csvFormat);
- CSVRecord record;
- try{
- record = csvParser.iterator().next();
- } catch (Exception e) {
- record = null;
- }
-
- return record;
-
- }
- }
-
-}
\ No newline at end of file
diff --git
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializer.java
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializer.java
deleted file mode 100644
index 80959f5..0000000
---
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.flume.Event;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.conf.ConfigurableComponent;
-
-import org.apache.phoenix.util.SQLCloseable;
-
-public interface EventSerializer extends
Configurable,ConfigurableComponent,SQLCloseable {
-
- /**
- * called during the start of the process to initialize the table columns.
- */
- public void initialize() throws SQLException;
-
- /**
- * @param events to be written to HBase.
- * @throws SQLException
- */
- public void upsertEvents(List<Event> events) throws SQLException;
-
-}
diff --git
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
deleted file mode 100644
index 8c99d7d..0000000
---
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-public enum EventSerializers {
-
- REGEX(RegexEventSerializer.class.getName()),
JSON(JsonEventSerializer.class.getName()),
CSV(CsvEventSerializer.class.getName());
-
- private final String className;
-
- private EventSerializers(String serializerClassName) {
- this.className = serializerClassName;
- }
-
- /**
- * @return Returns the serializer className.
- */
- public String getClassName() {
- return className;
- }
-}
\ No newline at end of file
diff --git
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java
deleted file mode 100644
index 24ebea8..0000000
---
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-import static org.apache.phoenix.flume.FlumeConstants.JSON_DEFAULT;
-import static org.apache.phoenix.flume.FlumeConstants.CONFIG_COLUMNS_MAPPING;
-import static org.apache.phoenix.flume.FlumeConstants.CONFIG_PARTIAL_SCHEMA;
-
-import java.sql.Array;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.phoenix.schema.types.PDataType;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.jayway.jsonpath.Configuration;
-import com.jayway.jsonpath.JsonPath;
-import com.jayway.jsonpath.spi.json.JsonOrgJsonProvider;
-import com.jayway.jsonpath.spi.mapper.JsonOrgMappingProvider;
-
-public class JsonEventSerializer extends BaseEventSerializer {
-
- private static final Logger logger =
LoggerFactory.getLogger(JsonEventSerializer.class);
-
- private JSONObject jsonSchema;
- private boolean isProperMapping;
- private boolean partialSchema;
-
- /**
- *
- */
- @Override
- public void doConfigure(Context context) {
- final String jsonData =
context.getString(CONFIG_COLUMNS_MAPPING, JSON_DEFAULT);
- try {
- jsonSchema = new JSONObject(jsonData);
- if (jsonSchema.length() == 0) {
- for (String colName : colNames) {
- jsonSchema.put(colName, colName);
- }
- isProperMapping = true;
- } else {
- Iterator<String> keys = jsonSchema.keys();
- List<String> keylist = new ArrayList<String>();
- while (keys.hasNext()) {
- keylist.add(keys.next());
- }
- isProperMapping =
CollectionUtils.isEqualCollection(keylist, colNames);
- }
- } catch (JSONException e) {
- e.printStackTrace();
- logger.debug("json mapping not proper, verify the data
{} ", jsonData);
- }
- partialSchema = context.getBoolean(CONFIG_PARTIAL_SCHEMA,
false);
- }
-
- /**
- *
- */
- @Override
- public void doInitialize() throws SQLException {
- // NO-OP
- }
-
- @Override
- public void upsertEvents(List<Event> events) throws SQLException {
- if (events == null){
- throw new NullPointerException();
- }
- if (connection == null){
- throw new NullPointerException();
- }
- if (this.upsertStatement == null){
- throw new NullPointerException();
- }
- if (!isProperMapping) {
- throw new IllegalArgumentException("Please verify
fields mapping is not properly done..");
- }
- boolean wasAutoCommit = connection.getAutoCommit();
- connection.setAutoCommit(false);
- try (PreparedStatement colUpsert =
connection.prepareStatement(upsertStatement)) {
- String value = null;
- Integer sqlType = null;
- JSONObject inputJson = new JSONObject();
- for (Event event : events) {
- byte[] payloadBytes = event.getBody();
- if (payloadBytes == null || payloadBytes.length
== 0) {
- continue;
- }
- String payload = new String(payloadBytes);
-
- try {
- inputJson = new JSONObject(payload);
- } catch (Exception e) {
- logger.debug("payload is not proper
json");
- continue;
- }
-
- Map<String, String> data = new HashMap<String,
String>();
- for (String colName : colNames) {
- String pattern = colName;
- if (jsonSchema.has(colName)) {
- Object obj =
jsonSchema.opt(colName);
- if (null != obj) {
- pattern =
obj.toString();
- }
- }
- pattern = "$." + pattern;
- value = getPatternData(inputJson,
pattern);
-
- // if field mapping data is null then
look for column data
- if (null == value && partialSchema) {
- pattern = "$." + colName;
- value =
getPatternData(inputJson, pattern);
- }
-
- data.put(colName, value);
- }
-
- Collection<String> values = data.values();
- if (values.contains(null)) {
- logger.debug("payload data {} doesn't
match the fields mapping {} ", inputJson, jsonSchema);
- continue;
- }
-
- int index = 1;
- int offset = 0;
- for (int i = 0; i < colNames.size(); i++,
offset++) {
- if (columnMetadata[offset] == null) {
- continue;
- }
- String colName = colNames.get(i);
- value = data.get(colName);
- sqlType =
columnMetadata[offset].getSqlType();
- PDataType pDataType =
PDataType.fromTypeId(sqlType);
- Object upsertValue;
- if (pDataType.isArrayType()) {
- JSONArray jsonArray = new
JSONArray(new JSONTokener(value));
- Object[] vals = new
Object[jsonArray.length()];
- for (int x = 0; x <
jsonArray.length(); x++) {
- vals[x] =
jsonArray.get(x);
- }
- String baseTypeSqlName =
PDataType.arrayBaseType(pDataType).getSqlTypeName();
- Array array =
connection.createArrayOf(baseTypeSqlName, vals);
- upsertValue =
pDataType.toObject(array, pDataType);
- } else {
- upsertValue =
pDataType.toObject(value);
- }
- if (upsertValue != null) {
- colUpsert.setObject(index++,
upsertValue, sqlType);
- } else {
- colUpsert.setNull(index++,
sqlType);
- }
- }
-
- // add headers if necessary
- Map<String, String> headerValues =
event.getHeaders();
- for (int i = 0; i < headers.size(); i++,
offset++) {
- String headerName = headers.get(i);
- String headerValue =
headerValues.get(headerName);
- sqlType =
columnMetadata[offset].getSqlType();
- Object upsertValue =
PDataType.fromTypeId(sqlType).toObject(headerValue);
- if (upsertValue != null) {
- colUpsert.setObject(index++,
upsertValue, sqlType);
- } else {
- colUpsert.setNull(index++,
sqlType);
- }
- }
-
- if (autoGenerateKey) {
- sqlType =
columnMetadata[offset].getSqlType();
- String generatedRowValue =
this.keyGenerator.generate();
- Object rowkeyValue =
PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
- colUpsert.setObject(index++,
rowkeyValue, sqlType);
- }
- colUpsert.execute();
- }
- connection.commit();
- } catch (Exception ex) {
- logger.error("An error {} occurred during persisting
the event ", ex.getMessage());
- throw new SQLException(ex.getMessage());
- } finally {
- if (wasAutoCommit) {
- connection.setAutoCommit(true);
- }
- }
-
- }
-
- private String getPatternData(JSONObject json, String pattern) {
- Configuration JSON_ORG_CONFIGURATION =
Configuration.builder().mappingProvider(new JsonOrgMappingProvider())
- .jsonProvider(new
JsonOrgJsonProvider()).build();
- String value;
- try {
- Object object =
JsonPath.using(JSON_ORG_CONFIGURATION).parse(json).read(pattern);
- value = object.toString();
- } catch (Exception e) {
- value = null;
- }
- return value;
- }
-
-}
\ No newline at end of file
diff --git
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/RegexEventSerializer.java
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/RegexEventSerializer.java
deleted file mode 100644
index bcf7bed..0000000
---
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/RegexEventSerializer.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-import static
org.apache.phoenix.flume.FlumeConstants.CONFIG_REGULAR_EXPRESSION;
-import static org.apache.phoenix.flume.FlumeConstants.IGNORE_CASE_CONFIG;
-import static org.apache.phoenix.flume.FlumeConstants.IGNORE_CASE_DEFAULT;
-import static org.apache.phoenix.flume.FlumeConstants.REGEX_DEFAULT;
-
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.phoenix.schema.types.PDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class RegexEventSerializer extends BaseEventSerializer {
-
- private static final Logger logger =
LoggerFactory.getLogger(RegexEventSerializer.class);
-
- private Pattern inputPattern;
-
- /**
- *
- */
- @Override
- public void doConfigure(Context context) {
- final String regex = context.getString(CONFIG_REGULAR_EXPRESSION,
REGEX_DEFAULT);
- final boolean regexIgnoreCase =
context.getBoolean(IGNORE_CASE_CONFIG,IGNORE_CASE_DEFAULT);
- inputPattern = Pattern.compile(regex, Pattern.DOTALL +
(regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));
- }
-
-
- /**
- *
- */
- @Override
- public void doInitialize() throws SQLException {
- // NO-OP
- }
-
-
- @Override
- public void upsertEvents(List<Event> events) throws SQLException {
- if (events == null){
- throw new NullPointerException();
- }
- if (connection == null){
- throw new NullPointerException();
- }
- if (this.upsertStatement == null){
- throw new NullPointerException();
- }
-
- boolean wasAutoCommit = connection.getAutoCommit();
- connection.setAutoCommit(false);
- try (PreparedStatement colUpsert =
connection.prepareStatement(upsertStatement)) {
- String value = null;
- Integer sqlType = null;
- for(Event event : events) {
- byte [] payloadBytes = event.getBody();
- if(payloadBytes == null || payloadBytes.length == 0) {
- continue;
- }
- String payload = new String(payloadBytes);
- Matcher m = inputPattern.matcher(payload.trim());
-
- if (!m.matches()) {
- logger.debug("payload {} doesn't match the pattern {} ",
payload, inputPattern.toString());
- continue;
- }
- if (m.groupCount() != colNames.size()) {
- logger.debug("payload {} size doesn't match the pattern {} ",
m.groupCount(), colNames.size());
- continue;
- }
- int index = 1 ;
- int offset = 0;
- for (int i = 0 ; i < colNames.size() ; i++,offset++) {
- if (columnMetadata[offset] == null ) {
- continue;
- }
-
- value = m.group(i + 1);
- sqlType = columnMetadata[offset].getSqlType();
- Object upsertValue =
PDataType.fromTypeId(sqlType).toObject(value);
- if (upsertValue != null) {
- colUpsert.setObject(index++, upsertValue, sqlType);
- } else {
- colUpsert.setNull(index++, sqlType);
- }
- }
-
- //add headers if necessary
- Map<String,String> headerValues = event.getHeaders();
- for(int i = 0 ; i < headers.size() ; i++ , offset++) {
-
- String headerName = headers.get(i);
- String headerValue = headerValues.get(headerName);
- sqlType = columnMetadata[offset].getSqlType();
- Object upsertValue =
PDataType.fromTypeId(sqlType).toObject(headerValue);
- if (upsertValue != null) {
- colUpsert.setObject(index++, upsertValue, sqlType);
- } else {
- colUpsert.setNull(index++, sqlType);
- }
- }
-
- if(autoGenerateKey) {
- sqlType = columnMetadata[offset].getSqlType();
- String generatedRowValue = this.keyGenerator.generate();
- Object rowkeyValue =
PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
- colUpsert.setObject(index++, rowkeyValue ,sqlType);
- }
- colUpsert.execute();
- }
- connection.commit();
- } catch(Exception ex){
- logger.error("An error {} occurred during persisting the event
",ex.getMessage());
- throw new SQLException(ex.getMessage());
- } finally {
- if(wasAutoCommit) {
- connection.setAutoCommit(true);
- }
- }
-
- }
-
-}
diff --git
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java
deleted file mode 100644
index 656ff43..0000000
---
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.sink;
-
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelException;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.phoenix.flume.FlumeConstants;
-import org.apache.phoenix.flume.serializer.EventSerializer;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class PhoenixSink extends AbstractSink implements Configurable {
- private static final Logger logger =
LoggerFactory.getLogger(PhoenixSink.class);
- private static AtomicInteger counter = new AtomicInteger();
- private static final String NAME = "Phoenix Sink__";
-
- private SinkCounter sinkCounter;
- private Integer batchSize;
- private EventSerializer serializer;
-
- public PhoenixSink(){
- }
-
- @Override
- public void configure(Context context){
- this.setName(NAME + counter.incrementAndGet());
- this.batchSize = context.getInteger(FlumeConstants.CONFIG_BATCHSIZE,
FlumeConstants.DEFAULT_BATCH_SIZE);
- final String eventSerializerType =
context.getString(FlumeConstants.CONFIG_SERIALIZER);
-
- if (eventSerializerType == null) {
- throw new NullPointerException("Event serializer cannot be empty,
please specify in the configuration file");
- }
- initializeSerializer(context,eventSerializerType);
- this.sinkCounter = new SinkCounter(this.getName());
- }
-
- /**
- * Initializes the serializer for flume events.
- * @param eventSerializerType
- */
- private void initializeSerializer(final Context context,final String
eventSerializerType) {
- String serializerClazz = null;
- EventSerializers eventSerializer = null;
-
- try {
- eventSerializer =
EventSerializers.valueOf(eventSerializerType.toUpperCase());
- } catch(IllegalArgumentException iae) {
- serializerClazz = eventSerializerType;
- }
-
- final Context serializerContext = new Context();
-
serializerContext.putAll(context.getSubProperties(FlumeConstants.CONFIG_SERIALIZER_PREFIX));
- copyPropertiesToSerializerContext(context,serializerContext);
-
- try {
- @SuppressWarnings("unchecked")
- Class<? extends EventSerializer> clazz = null;
- if(serializerClazz == null) {
- clazz = (Class<? extends EventSerializer>)
Class.forName(eventSerializer.getClassName());
- }
- else {
- clazz = (Class<? extends EventSerializer>)
Class.forName(serializerClazz);
- }
-
- serializer = clazz.newInstance();
- serializer.configure(serializerContext);
-
- } catch (Exception e) {
- logger.error("Could not instantiate event serializer." , e);
- if (e instanceof RuntimeException){
- throw (RuntimeException) e;
- }
- else {
- throw new RuntimeException(e);
- }
- }
- }
-
- private void copyPropertiesToSerializerContext(Context context, Context
serializerContext) {
-
-
serializerContext.put(FlumeConstants.CONFIG_TABLE_DDL,context.getString(FlumeConstants.CONFIG_TABLE_DDL));
-
serializerContext.put(FlumeConstants.CONFIG_TABLE,context.getString(FlumeConstants.CONFIG_TABLE));
-
serializerContext.put(FlumeConstants.CONFIG_ZK_QUORUM,context.getString(FlumeConstants.CONFIG_ZK_QUORUM));
-
serializerContext.put(FlumeConstants.CONFIG_JDBC_URL,context.getString(FlumeConstants.CONFIG_JDBC_URL));
-
serializerContext.put(FlumeConstants.CONFIG_BATCHSIZE,context.getString(FlumeConstants.CONFIG_BATCHSIZE));
- }
-
- @Override
- public void start() {
- logger.info("Starting sink {} ",this.getName());
- sinkCounter.start();
- try {
- serializer.initialize();
- sinkCounter.incrementConnectionCreatedCount();
- } catch(Exception ex) {
- sinkCounter.incrementConnectionFailedCount();
- logger.error("Error {} in initializing the
serializer.",ex.getMessage());
- if (ex instanceof RuntimeException){
- throw (RuntimeException) ex;
- }
- else {
- throw new RuntimeException(ex);
- }
- }
- super.start();
- }
-
- @Override
- public void stop(){
- super.stop();
- try {
- serializer.close();
- } catch (SQLException e) {
- logger.error(" Error while closing connection {} for sink {}
",e.getMessage(),this.getName());
- }
- sinkCounter.incrementConnectionClosedCount();
- sinkCounter.stop();
- }
-
- @Override
- public Status process() throws EventDeliveryException {
-
- Status status = Status.READY;
- Channel channel = getChannel();
- Transaction transaction = null;
- List<Event> events = new ArrayList<>(this.batchSize);
- long startTime = System.nanoTime();
- try {
- transaction = channel.getTransaction();
- transaction.begin();
-
- for(long i = 0; i < this.batchSize; i++) {
- Event event = channel.take();
- if(event == null){
- status = Status.BACKOFF;
- if (i == 0) {
- sinkCounter.incrementBatchEmptyCount();
- } else {
- sinkCounter.incrementBatchUnderflowCount();
- }
- break;
- } else {
- events.add(event);
- }
- }
- if (!events.isEmpty()) {
- if (events.size() == this.batchSize) {
- sinkCounter.incrementBatchCompleteCount();
- }
- else {
- sinkCounter.incrementBatchUnderflowCount();
- status = Status.BACKOFF;
- }
- // save to Hbase
- serializer.upsertEvents(events);
- sinkCounter.addToEventDrainSuccessCount(events.size());
- }
- else {
- logger.debug("no events to process ");
- sinkCounter.incrementBatchEmptyCount();
- status = Status.BACKOFF;
- }
- transaction.commit();
- } catch (ChannelException e) {
- transaction.rollback();
- status = Status.BACKOFF;
- sinkCounter.incrementConnectionFailedCount();
- }
- catch (SQLException e) {
- sinkCounter.incrementConnectionFailedCount();
- transaction.rollback();
- logger.error("exception while persisting to Hbase ", e);
- throw new EventDeliveryException("Failed to persist message to
Hbase", e);
- }
- catch (Throwable e) {
- transaction.rollback();
- logger.error("exception while processing in Phoenix Sink", e);
- throw new EventDeliveryException("Failed to persist message", e);
- }
- finally {
- logger.info(String.format("Time taken to process [%s] events was
[%s] seconds",
- events.size(),
- TimeUnit.SECONDS.convert(System.nanoTime() - startTime,
TimeUnit.NANOSECONDS)));
- if( transaction != null ) {
- transaction.close();
- }
- }
- return status;
- }
-
-}
diff --git a/pom.xml b/pom.xml
index b1a590a..461c073 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,6 @@
<!-- Changing the module order here may cause maven to get stuck in an
infinite loop -->
<module>phoenix5-hive</module>
<module>phoenix5-hive-shaded</module>
- <module>phoenix5-flume</module>
<module>phoenix5-spark</module>
<module>phoenix5-spark-shaded</module>
<module>phoenix5-spark3</module>
@@ -98,7 +97,6 @@
<hive.version>${hive3.version}</hive.version>
<hive3-storage.version>2.7.0</hive3-storage.version>
<hive-storage.version>${hive3-storage.version}</hive-storage.version>
- <flume.version>1.4.0</flume.version>
<spark.version>2.4.0</spark.version>
<spark3.version>3.0.3</spark3.version>
<scala.version>2.11.12</scala.version>
@@ -506,11 +504,6 @@
<artifactId>phoenix-shaded-guava</artifactId>
<version>${phoenix.thirdparty.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix5-flume</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix5-spark</artifactId>
@@ -815,39 +808,6 @@
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <version>${flume.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- <version>${flume.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-configuration</artifactId>
- <version>${flume.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>