This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 062b34c  PHOENIX-6941 Remove Phoenix Flume connector
062b34c is described below

commit 062b34cfa7ce39af22aa09b6cb12f19ce61d17ac
Author: Aron Meszaros <[email protected]>
AuthorDate: Mon Oct 9 17:18:01 2023 +0200

    PHOENIX-6941 Remove Phoenix Flume connector
---
 README.md                                          |   2 +-
 phoenix5-connectors-assembly/pom.xml               |  24 -
 .../src/build/components/phoenix5-jars.xml         |  10 -
 phoenix5-flume/pom.xml                             | 177 -------
 .../apache/phoenix/flume/CsvEventSerializerIT.java | 451 ----------------
 .../phoenix/flume/JsonEventSerializerIT.java       | 575 ---------------------
 .../org/apache/phoenix/flume/PhoenixSinkIT.java    | 290 -----------
 .../phoenix/flume/RegexEventSerializerIT.java      | 447 ----------------
 .../phoenix/flume/serializer/CustomSerializer.java |  43 --
 .../apache/phoenix/flume/sink/NullPhoenixSink.java |  21 -
 .../apache/phoenix/flume/DefaultKeyGenerator.java  |  69 ---
 .../org/apache/phoenix/flume/FlumeConstants.java   |  94 ----
 .../org/apache/phoenix/flume/KeyGenerator.java     |  24 -
 .../org/apache/phoenix/flume/SchemaHandler.java    |  48 --
 .../flume/serializer/BaseEventSerializer.java      | 252 ---------
 .../flume/serializer/CsvEventSerializer.java       | 209 --------
 .../phoenix/flume/serializer/EventSerializer.java  |  42 --
 .../phoenix/flume/serializer/EventSerializers.java |  36 --
 .../flume/serializer/JsonEventSerializer.java      | 232 ---------
 .../flume/serializer/RegexEventSerializer.java     | 150 ------
 .../org/apache/phoenix/flume/sink/PhoenixSink.java | 221 --------
 pom.xml                                            |  40 --
 22 files changed, 1 insertion(+), 3456 deletions(-)

diff --git a/README.md b/README.md
index fec24b6..cb1c4ba 100644
--- a/README.md
+++ b/README.md
@@ -22,4 +22,4 @@ limitations under the License.
 Copyright ©2019 [Apache Software Foundation](http://www.apache.org/). All 
Rights Reserved. 
 
 ## Introduction
-This repo contains the Flume, Spark and Hive connectors for Phoenix.
\ No newline at end of file
+This repo contains the Spark and Hive connectors for Phoenix.
\ No newline at end of file
diff --git a/phoenix5-connectors-assembly/pom.xml 
b/phoenix5-connectors-assembly/pom.xml
index 917ec5e..ed8a56d 100644
--- a/phoenix5-connectors-assembly/pom.xml
+++ b/phoenix5-connectors-assembly/pom.xml
@@ -37,10 +37,6 @@
   </properties>
 
   <dependencies>
-    <dependency>
-      <groupId>org.apache.phoenix</groupId>
-      <artifactId>phoenix5-flume</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.apache.phoenix</groupId>
       <artifactId>phoenix5-hive</artifactId>
@@ -69,26 +65,6 @@
         <artifactId>exec-maven-plugin</artifactId>
         <groupId>org.codehaus.mojo</groupId>
         <executions>
-          <execution>
-            <id>flume without version</id>
-            <phase>package</phase>
-            <goals>
-              <goal>exec</goal>
-            </goals>
-            <configuration>
-              <executable>ln</executable>
-              
<workingDirectory>${project.basedir}/../phoenix5-flume/target</workingDirectory>
-              <arguments>
-                <argument>-fnsv</argument>
-                <argument>
-                  phoenix5-flume-${project.version}.jar
-                </argument>
-                <argument>
-                  phoenix5-flume.jar
-                </argument>
-              </arguments>
-            </configuration>
-          </execution>
           <execution>
             <id>hive without version</id>
             <phase>package</phase>
diff --git 
a/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml 
b/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
index 589c5d8..ac02e2f 100644
--- a/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
+++ b/phoenix5-connectors-assembly/src/build/components/phoenix5-jars.xml
@@ -21,16 +21,6 @@
 -->
 <component>
   <fileSets>
-    <fileSet>
-      <!-- We don't actually have shaded flume JARs ATM, 
-      but there's no point in packaging the unshaded ones. -->
-      <directory>${project.basedir}/../phoenix5-flume/target</directory>
-      <outputDirectory>/</outputDirectory>
-      <includes>
-        <include>phoenix5-flume-${project.version}.jar</include>
-        <include>phoenix5-flume.jar</include>
-      </includes>
-    </fileSet>
     <fileSet>
       <directory>${project.basedir}/../phoenix5-spark-shaded/target</directory>
       <outputDirectory>/</outputDirectory>
diff --git a/phoenix5-flume/pom.xml b/phoenix5-flume/pom.xml
deleted file mode 100644
index 8623794..0000000
--- a/phoenix5-flume/pom.xml
+++ /dev/null
@@ -1,177 +0,0 @@
-<?xml version='1.0'?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied.  See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.phoenix</groupId>
-    <artifactId>phoenix-connectors</artifactId>
-    <version>6.0.0-SNAPSHOT</version>
-  </parent>
-  <artifactId>phoenix5-flume</artifactId>
-  <name>Phoenix Flume Connector for Phoenix 5</name>
-
-  <properties>
-    <top.dir>${project.basedir}/..</top.dir>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.phoenix</groupId>
-      <artifactId>phoenix-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.phoenix</groupId>
-      <artifactId>phoenix-hbase-compat-${hbase.compat.version}</artifactId>
-      <scope>runtime</scope>
-      <optional>true</optional>
-    </dependency>
-
-    <dependency>
-      <groupId>commons-collections</groupId>
-      <artifactId>commons-collections</artifactId>
-      <version>${commons-collections.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.tdunning</groupId>
-      <artifactId>json</artifactId>
-      <version>1.8</version>
-    </dependency>
-    <dependency>
-      <groupId>com.jayway.jsonpath</groupId>
-      <artifactId>json-path</artifactId>
-      <version>2.2.0</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-csv</artifactId>
-      <version>${commons-csv.version}</version>
-    </dependency>
-
-    <!-- Test Dependencies -->
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-it</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-minicluster</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <!-- Main dependency on flume. The last to avoid using old commons-io 
-      in IT -->
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-sdk</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-configuration</artifactId>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <pluginManagement>
-      <plugins>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-dependency-plugin</artifactId>
-          <configuration>
-            <!-- AFAICT these are bogus dependency problems -->
-            <ignoredUnusedDeclaredDependencies>
-              <ignoredUnusedDeclaredDependency>
-                org.apache.hbase:hbase-it
-              </ignoredUnusedDeclaredDependency>
-              <ignoredUnusedDeclaredDependency>
-                org.apache.hadoop:hadoop-minicluster
-              </ignoredUnusedDeclaredDependency>
-              <ignoredUnusedDeclaredDependency>
-                org.apache.hadoop:hadoop-common
-              </ignoredUnusedDeclaredDependency>
-              <ignoredUnusedDeclaredDependency>
-                org.apache.phoenix:phoenix-hbase-compat-${hbase.compat.version}
-              </ignoredUnusedDeclaredDependency>
-            </ignoredUnusedDeclaredDependencies>
-          </configuration>
-        </plugin>
-        <plugin>
-          <groupId>org.codehaus.mojo</groupId>
-          <artifactId>build-helper-maven-plugin</artifactId>
-          <version>3.0.0</version>
-          <executions>
-            <execution>
-              <id>add-parent-source</id>
-              <phase>generate-sources</phase>
-              <goals>
-                <goal>add-source</goal>
-              </goals>
-              <configuration>
-                <sources>
-                  <source>${project.parent.basedir}/src/main/java</source>
-                </sources>
-              </configuration>
-            </execution>
-            <execution>
-              <id>add-parent-test-source</id>
-              <phase>generate-sources</phase>
-              <goals>
-                <goal>add-test-source</goal>
-              </goals>
-              <configuration>
-                <sources>
-                  <source>${project.parent.basedir}/src/it/java</source>
-                </sources>
-              </configuration>
-            </execution>
-          </executions>
-        </plugin>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-failsafe-plugin</artifactId>
-        </plugin>
-        <plugin>
-          <groupId>org.apache.maven.plugins</groupId>
-          <artifactId>maven-resources-plugin</artifactId>
-        </plugin>
-      </plugins>
-    </pluginManagement>
-  </build>
-</project>
diff --git 
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java 
b/phoenix5-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
deleted file mode 100644
index 1db9d2f..0000000
--- 
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java
+++ /dev/null
@@ -1,451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.apache.phoenix.flume.sink.PhoenixSink;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.query.BaseTest;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class CsvEventSerializerIT extends BaseTest {
-
-       private Context sinkContext;
-       private PhoenixSink sink;
-
-    @BeforeClass
-    public static synchronized void doSetup() throws Exception {
-        setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
-    }
-
-    @AfterClass
-    public static synchronized void doTeardown() throws Exception {
-        dropNonSystemTables();
-    }
-
-    @After
-    public void cleanUpAfterTest() throws Exception {
-        deletePriorMetaData(HConstants.LATEST_TIMESTAMP, getUrl());
-    }
-
-       @Test
-       public void testWithDefaultDelimiters() throws EventDeliveryException, 
SQLException {
-
-               final String fullTableName = "FLUME_CSV_TEST";
-
-               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
-                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
-                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
-               String columns = "col1,col2,col3,col4";
-               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
-               initSinkContext(fullTableName, ddl, columns, null, null, null, 
null, rowkeyType, null);
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-
-               final String eventBody = 
"kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
-               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               channel.put(event);
-               transaction.commit();
-               transaction.close();
-
-               sink.process();
-
-               int rowsInDb = countRows(fullTableName);
-               assertEquals(1, rowsInDb);
-
-               sink.stop();
-               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
-               dropTable(fullTableName);
-       }
-
-       @Test
-       public void testKeyGenerator() throws EventDeliveryException, 
SQLException {
-
-               final String fullTableName = "FLUME_CSV_TEST";
-               initSinkContextWithDefaults(fullTableName);
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-               final String eventBody = 
"kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
-               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               channel.put(event);
-               transaction.commit();
-               transaction.close();
-
-               sink.process();
-
-               int rowsInDb = countRows(fullTableName);
-               assertEquals(1, rowsInDb);
-
-               sink.stop();
-               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
-               dropTable(fullTableName);
-       }
-
-       @Test
-       public void testMismatchKeyGenerator() throws EventDeliveryException, 
SQLException {
-
-               final String fullTableName = "FLUME_CSV_TEST";
-               initSinkContextWithDefaults(fullTableName);
-               setConfig(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
-                               DefaultKeyGenerator.UUID.name());
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-               final String eventBody = 
"kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
-               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               channel.put(event);
-               transaction.commit();
-               transaction.close();
-
-               try {
-                       sink.process();
-                       fail();
-               } catch (Exception ex) {
-                       
assertTrue(ex.getCause().getMessage().contains("java.lang.IllegalArgumentException:
 Invalid format:"));
-               }
-
-               dropTable(fullTableName);
-       }
-
-       @Test
-       public void testMissingColumnsInEvent() throws EventDeliveryException, 
SQLException {
-
-               final String fullTableName = "FLUME_CSV_TEST";
-               initSinkContextWithDefaults(fullTableName);
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-               final String eventBody = "kalyan,\"abc,pqr,xyz\",\"1,2,3,4\"";
-               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               channel.put(event);
-               transaction.commit();
-               transaction.close();
-
-               sink.process();
-
-               int rowsInDb = countRows(fullTableName);
-               assertEquals(0, rowsInDb);
-
-               sink.stop();
-               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
-               dropTable(fullTableName);
-       }
-
-       @Test
-       public void testBatchEvents() throws EventDeliveryException, 
SQLException {
-
-               final String fullTableName = "FLUME_CSV_TEST";
-               initSinkContextWithDefaults(fullTableName);
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-               int numEvents = 150;
-               String col1 = "val1";
-               String a1 = "\"aaa,bbb,ccc\"";
-               String a2 = "\"1,2,3,4\"";
-               String eventBody = null;
-               List<Event> eventList = new ArrayList<>(numEvents);
-               for (int i = 0; i < eventList.size(); i++) {
-                       eventBody = (col1 + i) + "," + i * 10.5 + "," + a1 + 
"," + a2;
-                       Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
-                       eventList.add(event);
-               }
-
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               for (Event event : eventList) {
-                       channel.put(event);
-               }
-               transaction.commit();
-               transaction.close();
-
-               sink.process();
-
-               int rowsInDb = countRows(fullTableName);
-               assertEquals(eventList.size(), rowsInDb);
-
-               sink.stop();
-               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
-               dropTable(fullTableName);
-       }
-
-       @Test
-       public void testEventsWithHeaders() throws Exception {
-
-               sinkContext = new Context();
-               final String fullTableName = "FLUME_CSV_TEST";
-               final String ddl = "CREATE TABLE IF NOT EXISTS "
-                               + fullTableName
-                               + "  (rowkey VARCHAR not null, col1 varchar , 
col2 double, col3 varchar[], col4 integer[], host varchar , source varchar \n"
-                               + "  CONSTRAINT pk PRIMARY KEY (rowkey))\n";
-               String columns = "col1,col2,col3,col4";
-               String rowkeyType = DefaultKeyGenerator.UUID.name();
-               String headers = "host,source";
-               initSinkContext(fullTableName, ddl, columns, null, null, null, 
null, rowkeyType, headers);
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-
-               int numEvents = 10;
-               String col1 = "val1";
-               String a1 = "\"aaa,bbb,ccc\"";
-               String a2 = "\"1,2,3,4\"";
-               String hostHeader = "host1";
-               String sourceHeader = "source1";
-               String eventBody = null;
-               List<Event> eventList = new ArrayList<>(numEvents);
-               for (int i = 0; i < numEvents; i++) {
-                       eventBody = (col1 + i) + "," + i * 10.5 + "," + a1 + 
"," + a2;
-                       Map<String, String> headerMap = new HashMap<>(2);
-                       headerMap.put("host", hostHeader);
-                       headerMap.put("source", sourceHeader);
-                       Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody), headerMap);
-                       eventList.add(event);
-               }
-
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               for (Event event : eventList) {
-                       channel.put(event);
-               }
-               transaction.commit();
-               transaction.close();
-
-               sink.process();
-
-               final String query = " SELECT * FROM \n " + fullTableName;
-               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-               final ResultSet rs;
-               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
-               try {
-                       rs = conn.createStatement().executeQuery(query);
-                       assertTrue(rs.next());
-                       assertEquals("host1", rs.getString("host"));
-                       assertEquals("source1", rs.getString("source"));
-
-                       assertTrue(rs.next());
-                       assertEquals("host1", rs.getString("host"));
-                       assertEquals("source1", rs.getString("source"));
-               } finally {
-                       if (conn != null) {
-                               conn.close();
-                       }
-               }
-               sink.stop();
-               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
-               dropTable(fullTableName);
-       }
-
-       private Channel initChannel() {
-               // Channel configuration
-               Context channelContext = new Context();
-               channelContext.put("capacity", "10000");
-               channelContext.put("transactionCapacity", "200");
-
-               Channel channel = new MemoryChannel();
-               channel.setName("memorychannel");
-               Configurables.configure(channel, channelContext);
-               return channel;
-       }
-
-       private void initSinkContext(final String fullTableName, final String 
ddl, final String columns,
-                       final String csvDelimiter, final String csvQuote, final 
String csvEscape, final String csvArrayDelimiter,
-                       final String rowkeyType, final String headers) {
-               if (fullTableName == null){
-                       throw new NullPointerException();
-               }
-               sinkContext = new Context();
-               sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
-               sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-               sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, 
EventSerializers.CSV.name());
-               sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
-               sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES, columns);
-               if (null != csvDelimiter)
-                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CSV_DELIMITER, csvDelimiter);
-               if (null != csvQuote)
-                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CSV_QUOTE, csvQuote);
-               if (null != csvEscape)
-                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CSV_ESCAPE, csvEscape);
-               if (null != csvArrayDelimiter)
-                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CSV_ARRAY_DELIMITER,
-                                       csvArrayDelimiter);
-               if (null != rowkeyType)
-                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
-                                       rowkeyType);
-               if (null != headers)
-                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_HEADER_NAMES, headers);
-       }
-
-       private void initSinkContextWithDefaults(final String fullTableName) {
-               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
-                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
-                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
-               String columns = "col1,col2,col3,col4";
-               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
-               initSinkContext(fullTableName, ddl, columns, null, null, null, 
null, rowkeyType, null);
-       }
-
-       private void setConfig(final String configName, final String 
configValue) {
-               if (sinkContext == null){
-                       throw new NullPointerException();
-               }
-               if (configName == null){
-                       throw new NullPointerException();
-               }
-               if (configValue == null){
-                       throw new NullPointerException();
-               }
-               sinkContext.put(configName, configValue);
-       }
-
-       private int countRows(final String fullTableName) throws SQLException {
-               if (fullTableName == null){
-                       throw new NullPointerException();
-               }
-               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
-               ResultSet rs = null;
-               try {
-                       rs = conn.createStatement().executeQuery("select 
count(*) from " + fullTableName);
-                       int rowsCount = 0;
-                       while (rs.next()) {
-                               rowsCount = rs.getInt(1);
-                       }
-                       return rowsCount;
-
-               } finally {
-                       if (rs != null) {
-                               rs.close();
-                       }
-                       if (conn != null) {
-                               conn.close();
-                       }
-               }
-
-       }
-
-       private void dropTable(final String fullTableName) throws SQLException {
-               if (fullTableName == null){
-                       throw new NullPointerException();
-               }
-
-               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
-               try {
-                       conn.createStatement().execute("drop table if exists " 
+ fullTableName);
-               } finally {
-                       if (conn != null) {
-                               conn.close();
-                       }
-               }
-       }
-
-}
diff --git 
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java
 
b/phoenix5-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java
deleted file mode 100644
index d53a560..0000000
--- 
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.apache.phoenix.flume.sink.PhoenixSink;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class JsonEventSerializerIT extends BaseTest {
-
-       private Context sinkContext;
-       private PhoenixSink sink;
-
-    @BeforeClass
-    public static synchronized void doSetup() throws Exception {
-        setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
-    }
-
-    @AfterClass
-    public static synchronized void doTeardown() throws Exception {
-        dropNonSystemTables();
-    }
-
-    @After
-    public void cleanUpAfterTest() throws Exception {
-        deletePriorMetaData(HConstants.LATEST_TIMESTAMP, getUrl());
-    }
-
-       @Test
-       public void testWithOutColumnsMapping() throws EventDeliveryException, 
SQLException {
-
-               final String fullTableName = "FLUME_JSON_TEST";
-
-               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
-                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
-                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
-               String columns = "col1,col2,col3,col4";
-               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
-               initSinkContext(fullTableName, ddl, columns, null, rowkeyType, 
null);
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-               final String eventBody = "{\"col1\" : \"kalyan\", \"col2\" : 
10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
-               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               channel.put(event);
-               transaction.commit();
-               transaction.close();
-
-               sink.process();
-
-               int rowsInDb = countRows(fullTableName);
-               assertEquals(1, rowsInDb);
-
-               sink.stop();
-               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
-               dropTable(fullTableName);
-       }
-
-       @Test
-       public void testDifferentColumnNames() throws EventDeliveryException, 
SQLException {
-
-               final String fullTableName = "FLUME_JSON_TEST";
-
-               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
-                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
-                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
-               String columns = "col1,col2,col3,col4";
-               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
-               String columnsMapping = 
"{\"col1\":\"col1\",\"col2\":\"f2\",\"col3\":\"f3\",\"col4\":\"col4\"}";
-
-               initSinkContext(fullTableName, ddl, columns, columnsMapping, 
rowkeyType, null);
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-               final String eventBody = "{\"col1\" : \"kalyan\", \"f2\" : 
10.5, \"f3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
-               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               channel.put(event);
-               transaction.commit();
-               transaction.close();
-
-               sink.process();
-
-               int rowsInDb = countRows(fullTableName);
-               assertEquals(1, rowsInDb);
-
-               sink.stop();
-               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
-               dropTable(fullTableName);
-       }
-
-       @Test
-       public void testInnerColumns() throws EventDeliveryException, 
SQLException {
-
-               final String fullTableName = "FLUME_JSON_TEST";
-
-               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
-                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
-                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
-               String columns = "col1,col2,col3,col4";
-               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
-               String columnsMapping = 
"{\"col1\":\"col1\",\"col2\":\"x.y\",\"col3\":\"a.b1.c\",\"col4\":\"col4\"}";
-
-               initSinkContext(fullTableName, ddl, columns, columnsMapping, 
rowkeyType, null);
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-               final String eventBody = "{\"col1\" : \"kalyan\", \"x\" : 
{\"y\" : 10.5}, \"a\" : {\"b1\" : {\"c\" : [\"abc\",\"pqr\",\"xyz\"] }, \"b2\" 
: 111}, \"col4\" : [1,2,3,4]}";
-               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               channel.put(event);
-               transaction.commit();
-               transaction.close();
-
-               sink.process();
-
-               int rowsInDb = countRows(fullTableName);
-               assertEquals(1, rowsInDb);
-
-               sink.stop();
-               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
-               dropTable(fullTableName);
-       }
-
-       @Test
-       public void testInnerColumnsWithArrayMapping() throws 
EventDeliveryException, SQLException {
-
-               final String fullTableName = "FLUME_JSON_TEST";
-
-               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
-                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
-                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
-               String columns = "col1,col2,col3,col4";
-               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
-               String columnsMapping = 
"{\"col1\":\"col1\",\"col2\":\"x.y\",\"col3\":\"a.b[*].c\",\"col4\":\"col4\"}";
-
-               initSinkContext(fullTableName, ddl, columns, columnsMapping, 
rowkeyType, null);
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-               final String eventBody = "{\"col1\" : \"kalyan\", \"x\" : 
{\"y\" : 10.5}, \"a\" : {\"b\" : [{\"c\" : \"abc\"}, {\"c\" : \"pqr\"}, {\"c\" 
: \"xyz\"}] , \"b2\" : 111}, \"col4\" : [1,2,3,4]}";
-               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               channel.put(event);
-               transaction.commit();
-               transaction.close();
-
-               sink.process();
-
-               int rowsInDb = countRows(fullTableName);
-               assertEquals(1, rowsInDb);
-
-               sink.stop();
-               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
-               dropTable(fullTableName);
-       }
-
-       @Test
-       public void testKeyGenerator() throws EventDeliveryException, 
SQLException {
-
-               final String fullTableName = "FLUME_JSON_TEST";
-               initSinkContextWithDefaults(fullTableName);
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-               final String eventBody = "{\"col1\" : \"kalyan\", \"col2\" : 
10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
-               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               channel.put(event);
-               transaction.commit();
-               transaction.close();
-
-               sink.process();
-
-               int rowsInDb = countRows(fullTableName);
-               assertEquals(1, rowsInDb);
-
-               sink.stop();
-               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
-               dropTable(fullTableName);
-       }
-
-       @Test
-       public void testMismatchKeyGenerator() throws EventDeliveryException, 
SQLException {
-
-               final String fullTableName = "FLUME_JSON_TEST";
-               initSinkContextWithDefaults(fullTableName);
-               setConfig(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
-                               DefaultKeyGenerator.UUID.name());
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-               final String eventBody = "{\"col1\" : \"kalyan\", \"col2\" : 
10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
-               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               channel.put(event);
-               transaction.commit();
-               transaction.close();
-
-               try {
-                       sink.process();
-                       fail();
-               } catch (Exception ex) {
-                       
assertTrue(ex.getCause().getMessage().contains("java.lang.IllegalArgumentException:
 Invalid format:"));
-               }
-
-               dropTable(fullTableName);
-       }
-
-       @Test
-       public void testMissingColumnsInEvent() throws EventDeliveryException, 
SQLException {
-
-               final String fullTableName = "FLUME_JSON_TEST";
-               initSinkContextWithDefaults(fullTableName);
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-               final String eventBody = "{\"col1\" : \"kalyan\", \"col3\" : 
[\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
-               final Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               channel.put(event);
-               transaction.commit();
-               transaction.close();
-
-               sink.process();
-
-               int rowsInDb = countRows(fullTableName);
-               assertEquals(0, rowsInDb);
-
-               sink.stop();
-               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
-               dropTable(fullTableName);
-       }
-
-       @Test
-       public void testBatchEvents() throws EventDeliveryException, 
SQLException {
-
-               final String fullTableName = "FLUME_JSON_TEST";
-               initSinkContextWithDefaults(fullTableName);
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-               int numEvents = 150;
-               String col1 = "val1";
-               String a1 = "[aaa,bbb,ccc]";
-               String a2 = "[1,2,3,4]";
-               String eventBody = null;
-               List<Event> eventList =  new ArrayList<>(numEvents);
-               for (int i = 0; i < eventList.size(); i++) {
-                       eventBody = "{\"col1\" : \"" + (col1 + i) + "\", 
\"col2\" : " + i * 10.5 + " , \"col3\" : " + a1
-                                       + " , \"col4\" : " + a2 + "}";
-                       Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody));
-                       eventList.add(event);
-               }
-
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               for (Event event : eventList) {
-                       channel.put(event);
-               }
-               transaction.commit();
-               transaction.close();
-
-               sink.process();
-
-               int rowsInDb = countRows(fullTableName);
-               assertEquals(eventList.size(), rowsInDb);
-
-               sink.stop();
-               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
-               dropTable(fullTableName);
-       }
-
-       @Test
-       public void testEventsWithHeaders() throws Exception {
-
-               sinkContext = new Context();
-               final String fullTableName = "FLUME_JSON_TEST";
-               final String ddl = "CREATE TABLE IF NOT EXISTS "
-                               + fullTableName
-                               + "  (rowkey VARCHAR not null, col1 varchar , 
col2 double, col3 varchar[], col4 integer[], host varchar , source varchar \n"
-                               + "  CONSTRAINT pk PRIMARY KEY (rowkey))\n";
-               String columns = "col1,col2,col3,col4";
-               String columnsMapping = 
"{\"col1\":\"col1\",\"col2\":\"col2\",\"col3\":\"col3\",\"col4\":\"col4\"}";
-               String rowkeyType = DefaultKeyGenerator.UUID.name();
-               String headers = "host,source";
-               initSinkContext(fullTableName, ddl, columns, columnsMapping, 
rowkeyType, headers);
-
-               sink = new PhoenixSink();
-               Configurables.configure(sink, sinkContext);
-               assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-               final Channel channel = this.initChannel();
-               sink.setChannel(channel);
-
-               sink.start();
-
-               int numEvents = 10;
-               String col1 = "val1";
-               String a1 = "[aaa,bbb,ccc]";
-               String a2 = "[1,2,3,4]";
-               String hostHeader = "host1";
-               String sourceHeader = "source1";
-               String eventBody = null;
-               List<Event> eventList = new ArrayList<>(numEvents);
-               for (int i = 0; i < numEvents; i++) {
-                       eventBody = "{\"col1\" : \"" + (col1 + i) + "\", 
\"col2\" : " + i * 10.5 + " , \"col3\" : " + a1
-                                       + " , \"col4\" : " + a2 + "}";
-                       Map<String, String> headerMap = new HashMap<>(2);
-                       headerMap.put("host", hostHeader);
-                       headerMap.put("source", sourceHeader);
-                       Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody), headerMap);
-                       eventList.add(event);
-               }
-
-               // put event in channel
-               Transaction transaction = channel.getTransaction();
-               transaction.begin();
-               for (Event event : eventList) {
-                       channel.put(event);
-               }
-               transaction.commit();
-               transaction.close();
-
-               sink.process();
-
-               final String query = " SELECT * FROM \n " + fullTableName;
-               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-               final ResultSet rs;
-               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
-               try {
-                       rs = conn.createStatement().executeQuery(query);
-                       assertTrue(rs.next());
-                       assertEquals("host1", rs.getString("host"));
-                       assertEquals("source1", rs.getString("source"));
-
-                       assertTrue(rs.next());
-                       assertEquals("host1", rs.getString("host"));
-                       assertEquals("source1", rs.getString("source"));
-               } finally {
-                       if (conn != null) {
-                               conn.close();
-                       }
-               }
-               sink.stop();
-               assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-
-               dropTable(fullTableName);
-       }
-
-       private Channel initChannel() {
-               // Channel configuration
-               Context channelContext = new Context();
-               channelContext.put("capacity", "10000");
-               channelContext.put("transactionCapacity", "200");
-
-               Channel channel = new MemoryChannel();
-               channel.setName("memorychannel");
-               Configurables.configure(channel, channelContext);
-               return channel;
-       }
-
-       private void initSinkContext(final String fullTableName, final String 
ddl, final String columns,
-                       final String columnsMapping, final String rowkeyType, 
final String headers) {
-               if (fullTableName == null) {
-                       throw new NullPointerException();
-               }
-               sinkContext = new Context();
-               sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
-               sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-               sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, 
EventSerializers.JSON.name());
-               sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
-               sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES, columns);
-               if (null != columnsMapping)
-                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_COLUMNS_MAPPING,
-                                       columnsMapping);
-               if (null != rowkeyType)
-                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
-                                       rowkeyType);
-               if (null != headers)
-                       sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX 
+ FlumeConstants.CONFIG_HEADER_NAMES, headers);
-       }
-
-       private void initSinkContextWithDefaults(final String fullTableName) {
-               String ddl = "CREATE TABLE IF NOT EXISTS " + fullTableName
-                               + "  (flume_time timestamp not null, col1 
varchar , col2 double, col3 varchar[], col4 integer[]"
-                               + "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
-               String columns = "col1,col2,col3,col4";
-               String columnsMapping = 
"{\"col1\":\"col1\",\"col2\":\"col2\",\"col3\":\"col3\",\"col4\":\"col4\"}";
-               String rowkeyType = DefaultKeyGenerator.TIMESTAMP.name();
-               initSinkContext(fullTableName, ddl, columns, columnsMapping, 
rowkeyType, null);
-       }
-
-       private void setConfig(final String configName, final String 
configValue) {
-               if (sinkContext == null){
-                       throw new NullPointerException();
-               }
-               if (configName == null){
-                       throw new NullPointerException();
-               }
-               if (configValue == null){
-                       throw new NullPointerException();
-               }
-               sinkContext.put(configName, configValue);
-       }
-
-       private int countRows(final String fullTableName) throws SQLException {
-               if (fullTableName == null){
-                       throw new NullPointerException();
-               }
-               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
-               ResultSet rs = null;
-               try {
-                       rs = conn.createStatement().executeQuery("select 
count(*) from " + fullTableName);
-                       int rowsCount = 0;
-                       while (rs.next()) {
-                               rowsCount = rs.getInt(1);
-                       }
-                       return rowsCount;
-
-               } finally {
-                       if (rs != null) {
-                               rs.close();
-                       }
-                       if (conn != null) {
-                               conn.close();
-                       }
-               }
-
-       }
-
-       private void dropTable(final String fullTableName) throws SQLException {
-               if (fullTableName == null){
-                       throw new NullPointerException();
-               }
-               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-               final Connection conn = DriverManager.getConnection(getUrl(), 
props);
-               try {
-                       conn.createStatement().execute("drop table if exists " 
+ fullTableName);
-               } finally {
-                       if (conn != null) {
-                               conn.close();
-                       }
-               }
-       }
-
-}
diff --git 
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java 
b/phoenix5-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java
deleted file mode 100644
index b9fe978..0000000
--- a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.util.Properties;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Sink;
-import org.apache.flume.SinkFactory;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.flume.sink.DefaultSinkFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.flume.serializer.CustomSerializer;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.apache.phoenix.flume.sink.NullPhoenixSink;
-import org.apache.phoenix.flume.sink.PhoenixSink;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.TestUtil;
-import org.apache.phoenix.query.BaseTest;
-import org.junit.Assert;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class PhoenixSinkIT extends BaseTest {
-
-    private Context sinkContext;
-    private PhoenixSink sink;
-
-    @BeforeClass
-    public static synchronized void doSetup() throws Exception {
-        setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
-    }
-
-    @AfterClass
-    public static synchronized void doTeardown() throws Exception {
-        dropNonSystemTables();
-    }
-
-    @After
-    public void cleanUpAfterTest() throws Exception {
-        deletePriorMetaData(HConstants.LATEST_TIMESTAMP, getUrl());
-    }
-
-    @Test
-    public void testSinkCreation() {
-        SinkFactory factory = new DefaultSinkFactory ();
-        Sink sink = factory.create("PhoenixSink__", 
"org.apache.phoenix.flume.sink.PhoenixSink");
-        Assert.assertNotNull(sink);
-        Assert.assertTrue(PhoenixSink.class.isInstance(sink));
-    }
-    @Test
-    public void testConfiguration () {
-        
-        sinkContext = new Context ();
-        sinkContext.put(FlumeConstants.CONFIG_TABLE, "test");
-        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-        
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
-
-        sink = new PhoenixSink();
-        Configurables.configure(sink, sinkContext);
-    }
-    
-    
-    
-    @Test(expected = NullPointerException.class)
-    public void testInvalidConfiguration () {
-        
-        sinkContext = new Context ();
-        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-        
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
-
-        sink = new PhoenixSink();
-        Configurables.configure(sink, sinkContext);
-    }
-    
-    @Test(expected=RuntimeException.class)
-    public void testInvalidConfigurationOfSerializer () {
-        
-        sinkContext = new Context ();
-        sinkContext.put(FlumeConstants.CONFIG_TABLE, "test");
-        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,"unknown");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
-
-        sink = new PhoenixSink();
-        Configurables.configure(sink, sinkContext);
-    }
-    
-    @Test
-    public void testInvalidTable() {
-        sinkContext = new Context ();
-        sinkContext.put(FlumeConstants.CONFIG_TABLE, "flume_test");
-        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, 
EventSerializers.REGEX.name());
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES, "col1,col2");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
-
-        sink = new PhoenixSink();
-        Configurables.configure(sink, sinkContext);
-
-        final Channel channel = this.initChannel();
-        sink.setChannel(channel);
-        try {
-            sink.start();
-            fail();
-        }catch(Exception e) {
-            assertTrue(e.getMessage(), e.getMessage().contains("ERROR 1012 
(42M03): Table undefined."));
-        }
-    }
-    
-    @Test
-    public void testSinkLifecycle () {
-        String tableName = generateUniqueName();
-
-        String ddl = "CREATE TABLE " + tableName +
-                "  (flume_time timestamp not null, col1 varchar , col2 
varchar" +
-                "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
-        
-        sinkContext = new Context ();
-        sinkContext.put(FlumeConstants.CONFIG_TABLE,  tableName);
-        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-        
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
-        sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_REGULAR_EXPRESSION,"^([^\t]+)\t([^\t]+)$");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
-
-
-        sink = new PhoenixSink();
-        Configurables.configure(sink, sinkContext);
-        Assert.assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-        final Channel channel = this.initChannel();
-        sink.setChannel(channel);
-
-        sink.start();
-        Assert.assertEquals(LifecycleState.START, sink.getLifecycleState());
-        sink.stop();
-        Assert.assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-    }
-    
-    @Test
-    public void testCreateTable () throws Exception {
-        String tableName = generateUniqueName();
-        String ddl = "CREATE TABLE " + tableName + " " +
-                "  (flume_time timestamp not null, col1 varchar , col2 
varchar" +
-                "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
-
-        final String fullTableName =  tableName;
-        sinkContext = new Context ();
-        sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
-        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-        
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
-        sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_REGULAR_EXPRESSION,"^([^\t]+)\t([^\t]+)$");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, 
DefaultKeyGenerator.TIMESTAMP.name());
-
-
-        sink = new PhoenixSink();
-        Configurables.configure(sink, sinkContext);
-        Assert.assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-
-        final Channel channel = this.initChannel();
-        sink.setChannel(channel);
-        
-        sink.start();
-        try {
-            Assert.assertTrue(driver.getConnectionQueryServices(getUrl(), 
TestUtil.TEST_PROPERTIES).getAdmin()
-                    .tableExists(TableName.valueOf(fullTableName)));
-        } finally {
-            admin.close();
-        }
-    }
-
-    @Test
-    public void testExtendedSink() throws Exception {
-        // Create a mock NullPhoenixSink which extends PhoenixSink, and verify 
configure is invoked()
-
-        PhoenixSink sink = mock(NullPhoenixSink.class);
-        sinkContext = new Context();
-        sinkContext.put(FlumeConstants.CONFIG_TABLE, "FLUME_TEST_EXTENDED");
-        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, 
CustomSerializer.class.getName());
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES, "ID, COUNTS");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, 
DefaultKeyGenerator.TIMESTAMP.name());
-
-        Configurables.configure(sink, sinkContext);
-        verify(sink).configure(sinkContext);
-    }
-
-    @Test
-    public void testExtendedSerializer() throws Exception {
-        /*
-        Sadly, we can't mock a serializer, as the PhoenixSink does a 
Class.forName() to instantiate
-        it. Instead. we'll setup a Flume channel and verify the data our 
custom serializer wrote.
-        */
-
-        final String fullTableName = "FLUME_TEST_EXTENDED";
-        final String ddl = "CREATE TABLE " + fullTableName + " (ID BIGINT NOT 
NULL PRIMARY KEY, COUNTS UNSIGNED_LONG)";
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        final Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute(ddl);
-        conn.commit();
-
-        sinkContext = new Context();
-        sinkContext.put(FlumeConstants.CONFIG_TABLE, "FLUME_TEST_EXTENDED");
-        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, 
CustomSerializer.class.getName());
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES, "ID, COUNTS");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, 
DefaultKeyGenerator.TIMESTAMP.name());
-
-        PhoenixSink sink = new PhoenixSink();
-        Configurables.configure(sink, sinkContext);
-
-        // Send a test event through Flume, using our custom serializer
-        final Channel channel = this.initChannel();
-        sink.setChannel(channel);
-        sink.start();
-
-        final Transaction transaction = channel.getTransaction();
-        transaction.begin();
-        channel.put(EventBuilder.withBody(Bytes.toBytes("test event")));
-        transaction.commit();
-        transaction.close();
-
-        sink.process();
-        sink.stop();
-
-        // Verify our serializer wrote out data
-        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
FLUME_TEST_EXTENDED");
-        assertTrue(rs.next());
-        assertTrue(rs.getLong(1) == 1L);
-    }
-    
-    private Channel initChannel() {
-        //Channel configuration
-        Context channelContext = new Context();
-        channelContext.put("capacity", "10000");
-        channelContext.put("transactionCapacity", "200");
-
-        Channel channel = new MemoryChannel();
-        channel.setName("memorychannel");
-        Configurables.configure(channel, channelContext);
-        return channel;
-    }
-    
-    
-}
diff --git 
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/RegexEventSerializerIT.java
 
b/phoenix5-flume/src/it/java/org/apache/phoenix/flume/RegexEventSerializerIT.java
deleted file mode 100644
index b849f10..0000000
--- 
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/RegexEventSerializerIT.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.apache.phoenix.flume.sink.PhoenixSink;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.query.BaseTest;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class RegexEventSerializerIT extends BaseTest {
-
-    private Context sinkContext;
-    private PhoenixSink sink;
-
-    @BeforeClass
-    public static synchronized void doSetup() throws Exception {
-        setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
-    }
-
-    @AfterClass
-    public static synchronized void doTeardown() throws Exception {
-        dropNonSystemTables();
-    }
-
-    @After
-    public void cleanUpAfterTest() throws Exception {
-        deletePriorMetaData(HConstants.LATEST_TIMESTAMP, getUrl());
-    }
-
-    @Test
-    public void testKeyGenerator() throws EventDeliveryException, SQLException 
{
-        
-        final String fullTableName = generateUniqueName();
-        initSinkContextWithDefaults(fullTableName);
-        
-        sink = new PhoenixSink();
-        Configurables.configure(sink, sinkContext);
- 
-        assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-      
-        final Channel channel = this.initChannel();
-        sink.setChannel(channel);
-        
-        sink.start();
-        final String eventBody = "val1\tval2";
-        final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
-        // put event in channel
-        Transaction transaction = channel.getTransaction();
-        transaction.begin();
-        channel.put(event);
-        transaction.commit();
-        transaction.close();
-
-        sink.process();
-       
-        int rowsInDb = countRows(fullTableName);
-        assertEquals(1 , rowsInDb);
-        
-        sink.stop();
-        assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-         
-    }
-    
-    
-    @Test
-    public void testMismatchKeyGenerator() throws EventDeliveryException, 
SQLException {
-        
-        final String fullTableName = generateUniqueName();
-        initSinkContextWithDefaults(fullTableName);
-        setConfig(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.UUID.name());
-     
-        sink = new PhoenixSink();
-        Configurables.configure(sink, sinkContext);
-        assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-       
-      
-        final Channel channel = this.initChannel();
-        sink.setChannel(channel);
-        
-        sink.start();
-        final String eventBody = "val1\tval2";
-        final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
-        // put event in channel
-        Transaction transaction = channel.getTransaction();
-        transaction.begin();
-        channel.put(event);
-        transaction.commit();
-        transaction.close();
-
-        try {
-            sink.process();
-            fail();
-        }catch(Exception ex){
-            
assertTrue(ex.getCause().getMessage().contains("java.lang.IllegalArgumentException:
 Invalid format:"));
-        }
-     }
-    
-    @Test
-    public void testMissingColumnsInEvent() throws EventDeliveryException, 
SQLException {
-        
-        final String fullTableName = generateUniqueName();
-        initSinkContextWithDefaults(fullTableName);
-      
-        sink = new PhoenixSink();
-        Configurables.configure(sink, sinkContext);
-        assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-      
-        final Channel channel = this.initChannel();
-        sink.setChannel(channel);
-        
-        sink.start();
-        final String eventBody = "val1";
-        final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
-        // put event in channel
-        Transaction transaction = channel.getTransaction();
-        transaction.begin();
-        channel.put(event);
-        transaction.commit();
-        transaction.close();
-
-        sink.process();
-        
-        int rowsInDb = countRows(fullTableName);
-        assertEquals(0 , rowsInDb);
-           
-        sink.stop();
-        assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-        
-    }
-    
-    @Test
-    public void testBatchEvents() throws EventDeliveryException, SQLException {
-        
-        final String fullTableName = generateUniqueName();
-        initSinkContextWithDefaults(fullTableName);
-      
-        sink = new PhoenixSink();
-        Configurables.configure(sink, sinkContext);
-        assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-      
-        final Channel channel = this.initChannel();
-        sink.setChannel(channel);
-        
-        sink.start();
-        int numEvents = 150;
-        String col1 = "val1";
-        String col2 = "val2";
-        String eventBody = null;
-        List<Event> eventList = new ArrayList<>(numEvents);
-        for(int i = 0 ; i < eventList.size() ; i++) {
-            eventBody = (col1 + i) + "\t" + (col2 + i);
-            Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
-            eventList.add(event);
-        }
-       
-        // put event in channel
-        Transaction transaction = channel.getTransaction();
-        transaction.begin();
-        for(Event event : eventList) {
-            channel.put(event);
-        }
-        transaction.commit();
-        transaction.close();
-
-        sink.process();
-        
-        int rowsInDb = countRows(fullTableName);
-        assertEquals(eventList.size(), rowsInDb);
-        
-        sink.stop();
-        assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-        
-    }
-    
-    @Test
-    public void testApacheLogRegex() throws Exception {
-        
-        sinkContext = new Context ();
-        final String fullTableName = generateUniqueName();
-        final String logRegex = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) 
\"([^ ]+) ([^ ]+)" +
-                                " ([^\"]+)\" (-|[0-9]*) (-|[0-9]*)(?: ([^ 
\"]*|\"[^\"]*\")" +
-                                " ([^ \"]*|\"[^\"]*\"))?";
-        
-        final String columns = 
"host,identity,user,time,method,request,protocol,status,size,referer,agent";
-        
-        String ddl = "CREATE TABLE " + fullTableName +
-                "  (uid VARCHAR NOT NULL, user VARCHAR, time varchar, host 
varchar , identity varchar, method varchar, request varchar , protocol 
varchar," +
-                "  status integer , size integer , referer varchar , agent 
varchar CONSTRAINT pk PRIMARY KEY (uid))\n";
-       
-        sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
-        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-        
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
-        sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_REGULAR_EXPRESSION,logRegex);
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES,columns);
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.UUID.name());
-       
-        String message1 = "33.22.11.00 - user1 [12/Dec/2013:07:01:19 +0000] " +
-                "\"GET /wp-admin/css/install.css HTTP/1.0\" 200 813 " + 
-                "\"http://www.google.com\"; \"Mozilla/5.0 (comp" +
-                "atible; Yahoo! Slurp; 
http://help.yahoo.com/help/us/ysearch/slurp)\"";
-        
-        String message2 = "192.168.20.1 - user2 [13/Dec/2013:06:05:19 +0000] " 
+
-                "\"GET /wp-admin/css/install.css HTTP/1.0\" 400 363 " + 
-                "\"http://www.salesforce.com/in/?ir=1\"; \"Mozilla/5.0 (comp" +
-                "atible;)\"";
-        
-        
-        sink = new PhoenixSink();
-        Configurables.configure(sink, sinkContext);
-        assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-      
-        final Channel channel = this.initChannel();
-        sink.setChannel(channel);
-        
-        sink.start();
-        
-        final Event event1 = EventBuilder.withBody(Bytes.toBytes(message1));
-        final Event event2 = EventBuilder.withBody(Bytes.toBytes(message2));
-        
-        final Transaction transaction = channel.getTransaction();
-        transaction.begin();
-        channel.put(event1);
-        channel.put(event2);
-        transaction.commit();
-        transaction.close();
-
-        sink.process();
-   
-        final String query = " SELECT * FROM \n " + fullTableName;
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        final ResultSet rs ;
-        final Connection conn = DriverManager.getConnection(getUrl(), props);
-        try{
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertTrue(rs.next());
-             
-        }finally {
-            if(conn != null) {
-                conn.close();
-            }
-        }
-        sink.stop();
-        assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-        
-    }
-    
-   
-    @Test
-    public void testEventsWithHeaders() throws Exception {
-        
-        sinkContext = new Context ();
-        final String fullTableName = generateUniqueName();
-        final String ddl = "CREATE TABLE " + fullTableName +
-                "  (rowkey VARCHAR not null, col1 varchar , cf1.col2 varchar , 
host varchar , source varchar \n" +
-                "  CONSTRAINT pk PRIMARY KEY (rowkey))\n";
-       
-        sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
-        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-        
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
-        sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_REGULAR_EXPRESSION,"^([^\t]+)\t([^\t]+)$");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,cf1.col2");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_HEADER_NAMES,"host,source");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.UUID.name());   
    
-        
-        sink = new PhoenixSink();
-        Configurables.configure(sink, sinkContext);
-        assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-      
-        final Channel channel = this.initChannel();
-        sink.setChannel(channel);
-        
-        sink.start();
-        
-        int numEvents = 10;
-        String col1 = "val1";
-        String col2 = "val2";
-        String hostHeader = "host1";
-        String sourceHeader = "source1";
-        String eventBody = null;
-        List<Event> eventList = new ArrayList<>(numEvents);
-        for(int i = 0 ; i < numEvents ; i++) {
-            eventBody = (col1 + i) + "\t" + (col2 + i);
-            Map<String, String> headerMap = new HashMap<>(2);
-            headerMap.put("host",hostHeader);
-            headerMap.put("source",sourceHeader);
-            Event event = 
EventBuilder.withBody(Bytes.toBytes(eventBody),headerMap);
-            eventList.add(event);
-        }
-       
-        // put event in channel
-        Transaction transaction = channel.getTransaction();
-        transaction.begin();
-        for(Event event : eventList) {
-            channel.put(event);
-        }
-        transaction.commit();
-        transaction.close();
-        
-        sink.process();
-   
-        final String query = " SELECT * FROM \n " + fullTableName;
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        final ResultSet rs ;
-        final Connection conn = DriverManager.getConnection(getUrl(), props);
-        try{
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("host1",rs.getString("host"));
-            assertEquals("source1",rs.getString("source"));
-            
-            assertTrue(rs.next());
-            assertEquals("host1",rs.getString("host"));
-            assertEquals("source1",rs.getString("source")); 
-        }finally {
-            if(conn != null) {
-                conn.close();
-            }
-        }
-        sink.stop();
-        assertEquals(LifecycleState.STOP, sink.getLifecycleState());
-        
-    }
-    
-    private Channel initChannel() {
-        //Channel configuration
-        Context channelContext = new Context();
-        channelContext.put("capacity", "10000");
-        channelContext.put("transactionCapacity", "200");
-
-        Channel channel = new MemoryChannel();
-        channel.setName("memorychannel");
-        Configurables.configure(channel, channelContext);
-        return channel;
-    }
-    
-    private void initSinkContextWithDefaults(final String fullTableName) {
-        if (fullTableName == null){
-            throw new NullPointerException();
-        }
-        sinkContext = new Context ();
-        String ddl = "CREATE TABLE " + fullTableName +
-                "  (flume_time timestamp not null, col1 varchar , col2 
varchar" +
-                "  CONSTRAINT pk PRIMARY KEY (flume_time))\n";
-       
-        sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
-        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-        
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
-        sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_REGULAR_EXPRESSION,"^([^\t]+)\t([^\t]+)$");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
-        
-      }
-    
-    private void setConfig(final String configName , final String configValue) 
{
-        if (sinkContext == null){
-            throw new NullPointerException();
-        }
-        if (configName == null){
-            throw new NullPointerException();
-        }
-        if (configValue == null){
-            throw new NullPointerException();
-        }
-        sinkContext.put(configName, configValue);
-    }
-    
-    private int countRows(final String fullTableName) throws SQLException {
-        if (fullTableName == null){
-            throw new NullPointerException();
-        }
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        final Connection conn = DriverManager.getConnection(getUrl(), props);
-        ResultSet rs = null ;
-        try{
-            rs  = conn.createStatement().executeQuery("select count(*) from 
"+fullTableName);
-            int rowsCount = 0;
-            while(rs.next()) {
-                rowsCount = rs.getInt(1);
-            }
-            return rowsCount;
-            
-        } finally {
-            if(rs != null) {
-                rs.close();
-            }
-            if(conn != null) {
-                conn.close();
-            }
-        }
-        
-       
-    }
-
-}
diff --git 
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java
 
b/phoenix5-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java
deleted file mode 100644
index 5db5fa6..0000000
--- 
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-
-public class CustomSerializer extends BaseEventSerializer {
-    @Override
-    public void doConfigure(Context context) {
-
-    }
-
-    @Override
-    public void doInitialize() throws SQLException {
-
-    }
-
-    @Override
-    public void upsertEvents(List<Event> events) throws SQLException {
-        // Just execute a sample UPSERT
-        connection.createStatement().execute("UPSERT INTO 
FLUME_TEST_EXTENDED(ID, COUNTS) VALUES(1, 1)");
-        connection.commit();
-    }
-}
diff --git 
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java 
b/phoenix5-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java
deleted file mode 100644
index 1df52e1..0000000
--- 
a/phoenix5-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.sink;
-
-public class NullPhoenixSink extends PhoenixSink {
-}
diff --git 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/DefaultKeyGenerator.java
 
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/DefaultKeyGenerator.java
deleted file mode 100644
index 3820c2a..0000000
--- 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/DefaultKeyGenerator.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.Random;
-
-import org.apache.phoenix.util.DateUtil;
-
-public enum DefaultKeyGenerator implements KeyGenerator {
-
-    UUID  {
-
-        @Override
-        public String generate() {
-           return String.valueOf(java.util.UUID.randomUUID());
-        }
-         
-    },
-    TIMESTAMP {
-
-        @Override
-        public String generate() {
-            java.sql.Timestamp ts = new Timestamp(System.currentTimeMillis());
-            return DateUtil.DEFAULT_DATE_FORMATTER.format(ts);
-        }
-        
-    },
-    DATE {
-        
-        @Override
-        public String generate() {
-            Date dt =  new Date(System.currentTimeMillis());
-            return DateUtil.DEFAULT_DATE_FORMATTER.format(dt);
-        } 
-    },
-    RANDOM {
-
-        @Override
-        public String generate() {
-            return String.valueOf(new Random().nextLong());
-        }
-        
-    },
-    NANOTIMESTAMP {
-
-        @Override
-        public String generate() {
-            return String.valueOf(System.nanoTime());
-        }
-        
-    };
-}
diff --git 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java 
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
deleted file mode 100644
index a146bbe..0000000
--- a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/FlumeConstants.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-public final class FlumeConstants {
-
-    /**
-     * The Hbase table which the sink should write to.
-     */
-    public static final String CONFIG_TABLE = "table";
-    /**
-     * The ddl query for the Hbase table where events are ingested to.
-     */
-    public static final String CONFIG_TABLE_DDL = "ddl";
-    /**
-     * Maximum number of events the sink should take from the channel per 
transaction, if available.
-     */
-    public static final String CONFIG_BATCHSIZE = "batchSize";
-    /**
-     * The fully qualified class name of the serializer the sink should use.
-     */
-    public static final String CONFIG_SERIALIZER = "serializer";
-    /**
-     * Configuration to pass to the serializer.
-     */
-    public static final String CONFIG_SERIALIZER_PREFIX = CONFIG_SERIALIZER + 
".";
-
-    /**
-     * Configuration for the zookeeper quorum.
-     */
-    public static final String CONFIG_ZK_QUORUM = "zookeeperQuorum";
-    
-    /**
-     * Configuration for the jdbc url.
-     */
-    public static final String CONFIG_JDBC_URL = "jdbcUrl";
-
-    /**
-     * Default batch size .
-     */
-    public static final Integer DEFAULT_BATCH_SIZE = 100;
-
-    /** Regular expression used to parse groups from event data. */
-    public static final String CONFIG_REGULAR_EXPRESSION = "regex";
-    public static final String REGEX_DEFAULT = "(.*)";
-
-    /** Whether to ignore case when performing regex matches. */
-    public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase";
-    public static final boolean IGNORE_CASE_DEFAULT = false;
-
-    /** JSON expression used to parse groups from event data. */
-    public static final String CONFIG_COLUMNS_MAPPING = "columnsMapping";
-    public static final String CONFIG_PARTIAL_SCHEMA = "partialSchema";
-    public static final String JSON_DEFAULT = "{}";
-
-    /** CSV expression used to parse groups from event data. */
-    public static final String CSV_DELIMITER = "csvDelimiter";
-    public static final String CSV_DELIMITER_DEFAULT = ",";
-    public static final String CSV_QUOTE = "csvQuote";
-    public static final String CSV_QUOTE_DEFAULT = "\"";
-    public static final String CSV_ESCAPE = "csvEscape";
-    public static final String CSV_ESCAPE_DEFAULT = "\\";
-    public static final String CSV_ARRAY_DELIMITER = "csvArrayDelimiter";
-    public static final String CSV_ARRAY_DELIMITER_DEFAULT = ",";
-
-    /** Comma separated list of column names . */
-    public static final String CONFIG_COLUMN_NAMES = "columns";
-
-    /** The header columns to persist as columns into the default column 
family. */
-    public static final String CONFIG_HEADER_NAMES = "headers";
-
-    /** The rowkey type generator . */
-    public static final String CONFIG_ROWKEY_TYPE_GENERATOR = "rowkeyType";
-
-    /**
-     * The default delimiter for columns and headers
-     */
-    public static final String DEFAULT_COLUMNS_DELIMITER = ",";
-}
diff --git 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/KeyGenerator.java 
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/KeyGenerator.java
deleted file mode 100644
index d823a56..0000000
--- a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/KeyGenerator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-public interface KeyGenerator {
-
-    public String generate();
-}
-
diff --git 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/SchemaHandler.java 
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/SchemaHandler.java
deleted file mode 100644
index 206b3e5..0000000
--- a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/SchemaHandler.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SchemaHandler {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(SchemaHandler.class);
-
-    public static boolean createTable(Connection connection, String 
createTableDdl) {
-        if (connection == null) {
-            throw new NullPointerException();
-        }
-        if (createTableDdl == null) {
-            throw new NullPointerException();
-        }
-        boolean status  = true;
-        try {
-            status = connection.createStatement().execute(createTableDdl);
-        } catch (SQLException e) {
-            logger.error("An error occurred during executing the create table 
ddl {} ",createTableDdl);
-            throw new RuntimeException(e);
-        }
-        return status;
-
-    }
-
-}
diff --git 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
 
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
deleted file mode 100644
index 01836a9..0000000
--- 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-import static org.apache.phoenix.flume.FlumeConstants.CONFIG_COLUMN_NAMES;
-import static org.apache.phoenix.flume.FlumeConstants.CONFIG_HEADER_NAMES;
-import static 
org.apache.phoenix.flume.FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR;
-import static 
org.apache.phoenix.flume.FlumeConstants.DEFAULT_COLUMNS_DELIMITER;
-import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.flume.Context;
-import org.apache.flume.conf.ComponentConfiguration;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.flume.DefaultKeyGenerator;
-import org.apache.phoenix.flume.FlumeConstants;
-import org.apache.phoenix.flume.KeyGenerator;
-import org.apache.phoenix.flume.SchemaHandler;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.StringUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public abstract class BaseEventSerializer implements EventSerializer {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(BaseEventSerializer.class);
-
-    protected Connection connection;
-    protected String fullTableName;
-    protected ColumnInfo[] columnMetadata;
-    protected boolean autoGenerateKey = false;
-    protected KeyGenerator  keyGenerator;
-    protected List<String>  colNames = new ArrayList<>(10);
-    protected List<String>  headers  = new ArrayList<>(5);
-    protected String upsertStatement;
-    private   String jdbcUrl;
-    private   Integer batchSize;
-    private   String  createTableDdl;
-
-
-
-
-
-    @Override
-    public void configure(Context context) {
-
-        this.createTableDdl = 
context.getString(FlumeConstants.CONFIG_TABLE_DDL);
-        this.fullTableName = context.getString(FlumeConstants.CONFIG_TABLE);
-        final String zookeeperQuorum = 
context.getString(FlumeConstants.CONFIG_ZK_QUORUM);
-        final String ipJdbcURL = 
context.getString(FlumeConstants.CONFIG_JDBC_URL);
-        this.batchSize = context.getInteger(FlumeConstants.CONFIG_BATCHSIZE, 
FlumeConstants.DEFAULT_BATCH_SIZE);
-        final String columnNames = context.getString(CONFIG_COLUMN_NAMES);
-        final String headersStr = context.getString(CONFIG_HEADER_NAMES);
-        final String keyGeneratorType = 
context.getString(CONFIG_ROWKEY_TYPE_GENERATOR);
-
-        if (this.fullTableName == null) {
-            throw new NullPointerException(
-                "Table name cannot be empty, please specify in the 
configuration file");
-        }
-        if(zookeeperQuorum != null && !zookeeperQuorum.isEmpty()) {
-            this.jdbcUrl = QueryUtil.getUrl(zookeeperQuorum);
-        }
-        if(ipJdbcURL != null && !ipJdbcURL.isEmpty()) {
-            this.jdbcUrl = ipJdbcURL;
-        }
-        if (this.jdbcUrl == null) {
-            throw new NullPointerException(
-                "Please specify either the zookeeper quorum or the jdbc url in 
the configuration file");
-        }
-        if (columnNames == null) {
-            throw new NullPointerException(
-                "Column names cannot be empty, please specify in configuration 
file");
-        }
-        
colNames.addAll(Arrays.asList(columnNames.split(DEFAULT_COLUMNS_DELIMITER)));
-
-        if(headersStr != null && !headersStr.isEmpty()) {
-            
headers.addAll(Arrays.asList(headersStr.split(DEFAULT_COLUMNS_DELIMITER)));
-        }
-
-        if(keyGeneratorType != null && !keyGeneratorType.isEmpty()) {
-            try {
-                keyGenerator =  
DefaultKeyGenerator.valueOf(keyGeneratorType.toUpperCase());
-                this.autoGenerateKey = true;
-            } catch(IllegalArgumentException iae) {
-                logger.error("An invalid key generator {} was specified in 
configuration file. Specify one of 
{}",keyGeneratorType,DefaultKeyGenerator.values());
-                throw new RuntimeException(iae);
-            }
-        }
-
-        logger.debug(" the jdbcUrl configured is {}",jdbcUrl);
-        logger.debug(" columns configured are {}",colNames.toString());
-        logger.debug(" headers configured are {}",headersStr);
-        logger.debug(" the keyGenerator configured is {} ",keyGeneratorType);
-
-        doConfigure(context);
-
-    }
-
-    @Override
-    public void configure(ComponentConfiguration conf) {
-        // NO-OP
-
-    }
-
-
-    @Override
-    public void initialize() throws SQLException {
-        final Properties props = new Properties();
-        props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, 
String.valueOf(this.batchSize));
-        ResultSet rs = null;
-        try {
-            this.connection = DriverManager.getConnection(this.jdbcUrl, props);
-            this.connection.setAutoCommit(false);
-            if(this.createTableDdl != null) {
-                SchemaHandler.createTable(connection,createTableDdl);
-            }
-
-
-            final Map<String,Integer> qualifiedColumnMap = new 
LinkedHashMap<>();
-            final Map<String,Integer> unqualifiedColumnMap = new 
LinkedHashMap<>();
-            final String schemaName = 
SchemaUtil.getSchemaNameFromFullName(fullTableName);
-            final String tableName  = 
SchemaUtil.getTableNameFromFullName(fullTableName);
-
-            String rowkey = null;
-            String  cq = null;
-            String  cf = null;
-            Integer dt = null;
-            rs = connection.getMetaData().getColumns("", 
StringUtil.escapeLike(SchemaUtil.normalizeIdentifier(schemaName)), 
StringUtil.escapeLike(SchemaUtil.normalizeIdentifier(tableName)), null);
-            while (rs.next()) {
-                cf = rs.getString(QueryUtil.COLUMN_FAMILY_POSITION);
-                cq = rs.getString(QueryUtil.COLUMN_NAME_POSITION);
-                // TODO: Fix this .. change `DATA_TYPE_POSITION` value 5 to 26
-                // dt = rs.getInt(QueryUtil.DATA_TYPE_POSITION);
-                dt = rs.getInt(26);
-                if(cf == null || cf.isEmpty()) {
-                    rowkey = cq; // this is required only when row key is auto 
generated
-                } else {
-                    qualifiedColumnMap.put(SchemaUtil.getColumnDisplayName(cf, 
cq), dt);
-                }
-                unqualifiedColumnMap.put(SchemaUtil.getColumnDisplayName(null, 
cq), dt);
-            }
-
-            //can happen when table not found in Hbase.
-            if(unqualifiedColumnMap.isEmpty()) {
-                throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.TABLE_UNDEFINED)
-                    .setTableName(tableName).build().buildException();
-            }
-
-            int colSize = colNames.size();
-            int headersSize = headers.size();
-            int totalSize = colSize + headersSize + ( autoGenerateKey ? 1 : 0);
-            columnMetadata = new ColumnInfo[totalSize] ;
-
-            int position = 0;
-            position = this.addToColumnMetadataInfo(colNames, 
qualifiedColumnMap, unqualifiedColumnMap, position);
-            position = this.addToColumnMetadataInfo(headers,  
qualifiedColumnMap, unqualifiedColumnMap, position);
-
-            if(autoGenerateKey) {
-                Integer sqlType = unqualifiedColumnMap.get(rowkey);
-                if (sqlType == null) {
-                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_MISSING)
-                        
.setColumnName(rowkey).setTableName(fullTableName).build().buildException();
-                }
-                columnMetadata[position] = new ColumnInfo(rowkey, sqlType);
-                position++;
-            }
-
-            this.upsertStatement = 
QueryUtil.constructUpsertStatement(fullTableName, 
Arrays.asList(columnMetadata));
-            logger.info(" the upsert statement is {} " ,this.upsertStatement);
-
-        }  catch (SQLException e) {
-            logger.error("error {} occurred during initializing connection 
",e.getMessage());
-            throw e;
-        } finally {
-            if(rs != null) {
-                rs.close();
-            }
-        }
-        doInitialize();
-    }
-
-    private int addToColumnMetadataInfo(final List<String> columns , final 
Map<String,Integer> qualifiedColumnsInfoMap, Map<String, Integer> 
unqualifiedColumnsInfoMap, int position) throws SQLException {
-        if (columns == null) {
-            throw new NullPointerException();
-        }
-        if (qualifiedColumnsInfoMap == null) {
-            throw new NullPointerException();
-        }
-        if (unqualifiedColumnsInfoMap == null) {
-            throw new NullPointerException();
-        }
-        for (int i = 0 ; i < columns.size() ; i++) {
-            String columnName = 
SchemaUtil.normalizeIdentifier(columns.get(i).trim());
-            Integer sqlType = unqualifiedColumnsInfoMap.get(columnName);
-            if (sqlType == null) {
-                sqlType = qualifiedColumnsInfoMap.get(columnName);
-                if (sqlType == null) {
-                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_NOT_FOUND)
-                        
.setColumnName(columnName).setTableName(this.fullTableName).build().buildException();
-                }
-            }
-            columnMetadata[position] = new ColumnInfo(columnName, sqlType);
-            position++;
-        }
-        return position;
-    }
-
-    public abstract void doConfigure(Context context);
-
-    public abstract void doInitialize() throws SQLException;
-
-
-    @Override
-    public void close() {
-        if(connection != null) {
-            try {
-                connection.close();
-            } catch (SQLException e) {
-                logger.error(" Error while closing connection {} ");
-            }
-        }
-    }
-}
diff --git 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
 
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
deleted file mode 100644
index 760b9c7..0000000
--- 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-import static org.apache.phoenix.flume.FlumeConstants.CSV_DELIMITER;
-import static org.apache.phoenix.flume.FlumeConstants.CSV_DELIMITER_DEFAULT;
-import static org.apache.phoenix.flume.FlumeConstants.CSV_QUOTE;
-import static org.apache.phoenix.flume.FlumeConstants.CSV_QUOTE_DEFAULT;
-import static org.apache.phoenix.flume.FlumeConstants.CSV_ESCAPE;
-import static org.apache.phoenix.flume.FlumeConstants.CSV_ESCAPE_DEFAULT;
-import static org.apache.phoenix.flume.FlumeConstants.CSV_ARRAY_DELIMITER;
-import static 
org.apache.phoenix.flume.FlumeConstants.CSV_ARRAY_DELIMITER_DEFAULT;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.sql.Array;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.phoenix.schema.types.PDataType;
-import org.json.JSONArray;
-import org.json.JSONTokener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-
-public class CsvEventSerializer extends BaseEventSerializer {
-
-       private static final Logger logger = 
LoggerFactory.getLogger(CsvEventSerializer.class);
-
-       private String csvDelimiter;
-       private String csvQuote;
-       private String csvEscape;
-       private String csvArrayDelimiter;
-       private CsvLineParser csvLineParser;
-
-       /**
-     * 
-     */
-       @Override
-       public void doConfigure(Context context) {
-               csvDelimiter = context.getString(CSV_DELIMITER, 
CSV_DELIMITER_DEFAULT);
-               csvQuote = context.getString(CSV_QUOTE, CSV_QUOTE_DEFAULT);
-               csvEscape = context.getString(CSV_ESCAPE, CSV_ESCAPE_DEFAULT);
-               csvArrayDelimiter = context.getString(CSV_ARRAY_DELIMITER, 
CSV_ARRAY_DELIMITER_DEFAULT);
-               csvLineParser = new 
CsvLineParser(csvDelimiter.toCharArray()[0], csvQuote.toCharArray()[0],
-                               csvEscape.toCharArray()[0]);
-       }
-
-       /**
-     * 
-     */
-       @Override
-       public void doInitialize() throws SQLException {
-               // NO-OP
-       }
-
-       @Override
-       public void upsertEvents(List<Event> events) throws SQLException {
-               if(events == null){
-                       throw new NullPointerException();
-               }
-               if(connection == null){
-                       throw new NullPointerException();
-               }
-               if(this.upsertStatement == null){
-                       throw new NullPointerException();
-               }
-
-               boolean wasAutoCommit = connection.getAutoCommit();
-               connection.setAutoCommit(false);
-               try (PreparedStatement colUpsert = 
connection.prepareStatement(upsertStatement)) {
-                       String value = null;
-                       Integer sqlType = null;
-                       for (Event event : events) {
-                               byte[] payloadBytes = event.getBody();
-                               if (payloadBytes == null || payloadBytes.length 
== 0) {
-                                       continue;
-                               }
-                               String payload = new String(payloadBytes);
-                               CSVRecord csvRecord = 
csvLineParser.parse(payload);
-                               if (colNames.size() != csvRecord.size()) {
-                                       logger.debug("payload data {} doesn't 
match the fields mapping {} ", payload, colNames);
-                                       continue;
-                               }
-                               Map<String, String> data = new HashMap<String, 
String>();
-                               for (int i = 0; i < csvRecord.size(); i++) {
-                                       data.put(colNames.get(i), 
csvRecord.get(i));
-                               }
-                               Collection<String> values = data.values();
-                               if (values.contains(null)) {
-                                       logger.debug("payload data {} doesn't 
match the fields mapping {} ", payload, colNames);
-                                       continue;
-                               }
-
-                               int index = 1;
-                               int offset = 0;
-                               for (int i = 0; i < colNames.size(); i++, 
offset++) {
-                                       if (columnMetadata[offset] == null) {
-                                               continue;
-                                       }
-                                       String colName = colNames.get(i);
-                                       value = data.get(colName);
-                                       sqlType = 
columnMetadata[offset].getSqlType();
-                                       PDataType pDataType = 
PDataType.fromTypeId(sqlType);
-                                       Object upsertValue;
-                                       if (pDataType.isArrayType()) {
-                                               String arrayJson = 
Arrays.toString(value.split(csvArrayDelimiter));
-                                               JSONArray jsonArray = new 
JSONArray(new JSONTokener(arrayJson));
-                                               Object[] vals = new 
Object[jsonArray.length()];
-                                               for (int x = 0; x < 
jsonArray.length(); x++) {
-                                                       vals[x] = 
jsonArray.get(x);
-                                               }
-                                               String baseTypeSqlName = 
PDataType.arrayBaseType(pDataType).getSqlTypeName();
-                                               Array array = 
connection.createArrayOf(baseTypeSqlName, vals);
-                                               upsertValue = 
pDataType.toObject(array, pDataType);
-                                       } else {
-                                               upsertValue = 
pDataType.toObject(value);
-                                       }
-                                       if (upsertValue != null) {
-                                               colUpsert.setObject(index++, 
upsertValue, sqlType);
-                                       } else {
-                                               colUpsert.setNull(index++, 
sqlType);
-                                       }
-                               }
-
-                               // add headers if necessary
-                               Map<String, String> headerValues = 
event.getHeaders();
-                               for (int i = 0; i < headers.size(); i++, 
offset++) {
-                                       String headerName = headers.get(i);
-                                       String headerValue = 
headerValues.get(headerName);
-                                       sqlType = 
columnMetadata[offset].getSqlType();
-                                       Object upsertValue = 
PDataType.fromTypeId(sqlType).toObject(headerValue);
-                                       if (upsertValue != null) {
-                                               colUpsert.setObject(index++, 
upsertValue, sqlType);
-                                       } else {
-                                               colUpsert.setNull(index++, 
sqlType);
-                                       }
-                               }
-
-                               if (autoGenerateKey) {
-                                       sqlType = 
columnMetadata[offset].getSqlType();
-                                       String generatedRowValue = 
this.keyGenerator.generate();
-                                       Object rowkeyValue = 
PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
-                                       colUpsert.setObject(index++, 
rowkeyValue, sqlType);
-                               }
-                               colUpsert.execute();
-                       }
-                       connection.commit();
-               } catch (Exception ex) {
-                       logger.error("An error {} occurred during persisting 
the event ", ex.getMessage());
-                       throw new SQLException(ex.getMessage());
-               } finally {
-                       if (wasAutoCommit) {
-                               connection.setAutoCommit(true);
-                       }
-               }
-
-       }
-
-       static class CsvLineParser {
-               private final CSVFormat csvFormat;
-
-               CsvLineParser(char fieldDelimiter, char quote, char escape) {
-                       this.csvFormat = 
CSVFormat.DEFAULT.withIgnoreEmptyLines(true).withDelimiter(fieldDelimiter)
-                                       .withEscape(escape).withQuote(quote);
-               }
-
-               public CSVRecord parse(String input) throws IOException {
-                       CSVParser csvParser = new CSVParser(new 
StringReader(input), this.csvFormat);
-                       CSVRecord record;
-                       try{
-                               record = csvParser.iterator().next();
-                       } catch (Exception e) {
-                               record = null;
-                       }
-
-                       return record;
-
-               }
-       }
-
-}
\ No newline at end of file
diff --git 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializer.java
 
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializer.java
deleted file mode 100644
index 80959f5..0000000
--- 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.flume.Event;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.conf.ConfigurableComponent;
-
-import org.apache.phoenix.util.SQLCloseable;
-
-public interface EventSerializer extends 
Configurable,ConfigurableComponent,SQLCloseable {
-
-    /**
-     * called during the start of the process to initialize the table columns.
-     */
-    public void initialize() throws SQLException;
-    
-    /**
-     * @param events to be written to HBase.
-     * @throws SQLException 
-     */
-    public void upsertEvents(List<Event> events) throws SQLException;
-    
-}
diff --git 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
 
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
deleted file mode 100644
index 8c99d7d..0000000
--- 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/EventSerializers.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-public enum EventSerializers {
-
-    REGEX(RegexEventSerializer.class.getName()), 
JSON(JsonEventSerializer.class.getName()), 
CSV(CsvEventSerializer.class.getName());
-    
-    private final String className;
-    
-    private EventSerializers(String serializerClassName) {
-        this.className = serializerClassName;
-    }
-
-    /**
-     * @return Returns the serializer className.
-     */
-    public String getClassName() {
-        return className;
-    }
-}
\ No newline at end of file
diff --git 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java
 
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java
deleted file mode 100644
index 24ebea8..0000000
--- 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-import static org.apache.phoenix.flume.FlumeConstants.JSON_DEFAULT;
-import static org.apache.phoenix.flume.FlumeConstants.CONFIG_COLUMNS_MAPPING;
-import static org.apache.phoenix.flume.FlumeConstants.CONFIG_PARTIAL_SCHEMA;
-
-import java.sql.Array;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.phoenix.schema.types.PDataType;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.jayway.jsonpath.Configuration;
-import com.jayway.jsonpath.JsonPath;
-import com.jayway.jsonpath.spi.json.JsonOrgJsonProvider;
-import com.jayway.jsonpath.spi.mapper.JsonOrgMappingProvider;
-
-public class JsonEventSerializer extends BaseEventSerializer {
-
-       private static final Logger logger = 
LoggerFactory.getLogger(JsonEventSerializer.class);
-
-       private JSONObject jsonSchema;
-       private boolean isProperMapping;
-       private boolean partialSchema;
-
-       /**
-     * 
-     */
-       @Override
-       public void doConfigure(Context context) {
-               final String jsonData = 
context.getString(CONFIG_COLUMNS_MAPPING, JSON_DEFAULT);
-               try {
-                       jsonSchema = new JSONObject(jsonData);
-                       if (jsonSchema.length() == 0) {
-                               for (String colName : colNames) {
-                                       jsonSchema.put(colName, colName);
-                               }
-                               isProperMapping = true;
-                       } else {
-                               Iterator<String> keys = jsonSchema.keys();
-                               List<String> keylist = new ArrayList<String>();
-                               while (keys.hasNext()) {
-                                       keylist.add(keys.next());
-                               }
-                               isProperMapping = 
CollectionUtils.isEqualCollection(keylist, colNames);
-                       }
-               } catch (JSONException e) {
-                       e.printStackTrace();
-                       logger.debug("json mapping not proper, verify the data 
{} ", jsonData);
-               }
-               partialSchema = context.getBoolean(CONFIG_PARTIAL_SCHEMA, 
false);
-       }
-
-       /**
-     * 
-     */
-       @Override
-       public void doInitialize() throws SQLException {
-               // NO-OP
-       }
-
-       @Override
-       public void upsertEvents(List<Event> events) throws SQLException {
-               if (events == null){
-                       throw new NullPointerException();
-               }
-               if (connection == null){
-                       throw new NullPointerException();
-               }
-               if (this.upsertStatement == null){
-                       throw new NullPointerException();
-               }
-               if (!isProperMapping) {
-                       throw new IllegalArgumentException("Please verify 
fields mapping is not properly done..");
-               }
-               boolean wasAutoCommit = connection.getAutoCommit();
-               connection.setAutoCommit(false);
-               try (PreparedStatement colUpsert = 
connection.prepareStatement(upsertStatement)) {
-                       String value = null;
-                       Integer sqlType = null;
-                       JSONObject inputJson = new JSONObject();
-                       for (Event event : events) {
-                               byte[] payloadBytes = event.getBody();
-                               if (payloadBytes == null || payloadBytes.length 
== 0) {
-                                       continue;
-                               }
-                               String payload = new String(payloadBytes);
-
-                               try {
-                                       inputJson = new JSONObject(payload);
-                               } catch (Exception e) {
-                                       logger.debug("payload is not proper 
json");
-                                       continue;
-                               }
-
-                               Map<String, String> data = new HashMap<String, 
String>();
-                               for (String colName : colNames) {
-                                       String pattern = colName;
-                                       if (jsonSchema.has(colName)) {
-                                               Object obj = 
jsonSchema.opt(colName);
-                                               if (null != obj) {
-                                                       pattern = 
obj.toString();
-                                               }
-                                       }
-                                       pattern = "$." + pattern;
-                                       value = getPatternData(inputJson, 
pattern);
-
-                                       // if field mapping data is null then 
look for column data
-                                       if (null == value && partialSchema) {
-                                               pattern = "$." + colName;
-                                               value = 
getPatternData(inputJson, pattern);
-                                       }
-
-                                       data.put(colName, value);
-                               }
-
-                               Collection<String> values = data.values();
-                               if (values.contains(null)) {
-                                       logger.debug("payload data {} doesn't 
match the fields mapping {} ", inputJson, jsonSchema);
-                                       continue;
-                               }
-
-                               int index = 1;
-                               int offset = 0;
-                               for (int i = 0; i < colNames.size(); i++, 
offset++) {
-                                       if (columnMetadata[offset] == null) {
-                                               continue;
-                                       }
-                                       String colName = colNames.get(i);
-                                       value = data.get(colName);
-                                       sqlType = 
columnMetadata[offset].getSqlType();
-                                       PDataType pDataType = 
PDataType.fromTypeId(sqlType);
-                                       Object upsertValue;
-                                       if (pDataType.isArrayType()) {
-                                               JSONArray jsonArray = new 
JSONArray(new JSONTokener(value));
-                                               Object[] vals = new 
Object[jsonArray.length()];
-                                               for (int x = 0; x < 
jsonArray.length(); x++) {
-                                                       vals[x] = 
jsonArray.get(x);
-                                               }
-                                               String baseTypeSqlName = 
PDataType.arrayBaseType(pDataType).getSqlTypeName();
-                                               Array array = 
connection.createArrayOf(baseTypeSqlName, vals);
-                                               upsertValue = 
pDataType.toObject(array, pDataType);
-                                       } else {
-                                               upsertValue = 
pDataType.toObject(value);
-                                       }
-                                       if (upsertValue != null) {
-                                               colUpsert.setObject(index++, 
upsertValue, sqlType);
-                                       } else {
-                                               colUpsert.setNull(index++, 
sqlType);
-                                       }
-                               }
-
-                               // add headers if necessary
-                               Map<String, String> headerValues = 
event.getHeaders();
-                               for (int i = 0; i < headers.size(); i++, 
offset++) {
-                                       String headerName = headers.get(i);
-                                       String headerValue = 
headerValues.get(headerName);
-                                       sqlType = 
columnMetadata[offset].getSqlType();
-                                       Object upsertValue = 
PDataType.fromTypeId(sqlType).toObject(headerValue);
-                                       if (upsertValue != null) {
-                                               colUpsert.setObject(index++, 
upsertValue, sqlType);
-                                       } else {
-                                               colUpsert.setNull(index++, 
sqlType);
-                                       }
-                               }
-
-                               if (autoGenerateKey) {
-                                       sqlType = 
columnMetadata[offset].getSqlType();
-                                       String generatedRowValue = 
this.keyGenerator.generate();
-                                       Object rowkeyValue = 
PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
-                                       colUpsert.setObject(index++, 
rowkeyValue, sqlType);
-                               }
-                               colUpsert.execute();
-                       }
-                       connection.commit();
-               } catch (Exception ex) {
-                       logger.error("An error {} occurred during persisting 
the event ", ex.getMessage());
-                       throw new SQLException(ex.getMessage());
-               } finally {
-                       if (wasAutoCommit) {
-                               connection.setAutoCommit(true);
-                       }
-               }
-
-       }
-
-       private String getPatternData(JSONObject json, String pattern) {
-               Configuration JSON_ORG_CONFIGURATION = 
Configuration.builder().mappingProvider(new JsonOrgMappingProvider())
-                               .jsonProvider(new 
JsonOrgJsonProvider()).build();
-               String value;
-               try {
-                       Object object = 
JsonPath.using(JSON_ORG_CONFIGURATION).parse(json).read(pattern);
-                       value = object.toString();
-               } catch (Exception e) {
-                       value = null;
-               }
-               return value;
-       }
-
-}
\ No newline at end of file
diff --git 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/RegexEventSerializer.java
 
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/RegexEventSerializer.java
deleted file mode 100644
index bcf7bed..0000000
--- 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/RegexEventSerializer.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.serializer;
-
-import static 
org.apache.phoenix.flume.FlumeConstants.CONFIG_REGULAR_EXPRESSION;
-import static org.apache.phoenix.flume.FlumeConstants.IGNORE_CASE_CONFIG;
-import static org.apache.phoenix.flume.FlumeConstants.IGNORE_CASE_DEFAULT;
-import static org.apache.phoenix.flume.FlumeConstants.REGEX_DEFAULT;
-
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.phoenix.schema.types.PDataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class RegexEventSerializer extends BaseEventSerializer {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(RegexEventSerializer.class);
-  
-    private Pattern inputPattern;
-    
-    /**
-     * 
-     */
-    @Override
-    public void doConfigure(Context context) {
-        final String regex    = context.getString(CONFIG_REGULAR_EXPRESSION, 
REGEX_DEFAULT);
-        final boolean regexIgnoreCase = 
context.getBoolean(IGNORE_CASE_CONFIG,IGNORE_CASE_DEFAULT);
-        inputPattern = Pattern.compile(regex, Pattern.DOTALL + 
(regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));
-     }
-
-     
-    /**
-     * 
-     */
-    @Override
-    public void doInitialize() throws SQLException {
-        // NO-OP
-    }
-    
-   
-    @Override
-    public void upsertEvents(List<Event> events) throws SQLException {
-        if (events == null){
-            throw new NullPointerException();
-        }
-        if (connection == null){
-            throw new NullPointerException();
-        }
-        if (this.upsertStatement == null){
-            throw new NullPointerException();
-        }
-
-       boolean wasAutoCommit = connection.getAutoCommit();
-       connection.setAutoCommit(false);
-       try (PreparedStatement colUpsert = 
connection.prepareStatement(upsertStatement)) {
-           String value = null;
-           Integer sqlType = null;
-           for(Event event : events) {
-               byte [] payloadBytes = event.getBody();
-               if(payloadBytes == null || payloadBytes.length == 0) {
-                   continue;
-               }
-               String payload = new String(payloadBytes);
-               Matcher m = inputPattern.matcher(payload.trim());
-               
-               if (!m.matches()) {
-                 logger.debug("payload {} doesn't match the pattern {} ", 
payload, inputPattern.toString());  
-                 continue;
-               }
-               if (m.groupCount() != colNames.size()) {
-                 logger.debug("payload {} size doesn't match the pattern {} ", 
m.groupCount(), colNames.size());
-                 continue;
-               }
-               int index = 1 ;
-               int offset = 0;
-               for (int i = 0 ; i <  colNames.size() ; i++,offset++) {
-                   if (columnMetadata[offset] == null ) {
-                       continue;
-                   }
-                   
-                   value = m.group(i + 1);
-                   sqlType = columnMetadata[offset].getSqlType();
-                   Object upsertValue = 
PDataType.fromTypeId(sqlType).toObject(value);
-                   if (upsertValue != null) {
-                       colUpsert.setObject(index++, upsertValue, sqlType);
-                   } else {
-                       colUpsert.setNull(index++, sqlType);
-                   }
-                }
-               
-               //add headers if necessary
-               Map<String,String> headerValues = event.getHeaders();
-               for(int i = 0 ; i < headers.size() ; i++ , offset++) {
-                
-                   String headerName  = headers.get(i);
-                   String headerValue = headerValues.get(headerName);
-                   sqlType = columnMetadata[offset].getSqlType();
-                   Object upsertValue = 
PDataType.fromTypeId(sqlType).toObject(headerValue);
-                   if (upsertValue != null) {
-                       colUpsert.setObject(index++, upsertValue, sqlType);
-                   } else {
-                       colUpsert.setNull(index++, sqlType);
-                   }
-               }
-  
-               if(autoGenerateKey) {
-                   sqlType = columnMetadata[offset].getSqlType();
-                   String generatedRowValue = this.keyGenerator.generate();
-                   Object rowkeyValue = 
PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
-                   colUpsert.setObject(index++, rowkeyValue ,sqlType);
-               } 
-               colUpsert.execute();
-           }
-           connection.commit();
-       } catch(Exception ex){
-           logger.error("An error {} occurred during persisting the event 
",ex.getMessage());
-           throw new SQLException(ex.getMessage());
-       } finally {
-           if(wasAutoCommit) {
-               connection.setAutoCommit(true);
-           }
-       }
-       
-    }
-
-}
diff --git 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java 
b/phoenix5-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java
deleted file mode 100644
index 656ff43..0000000
--- 
a/phoenix5-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.flume.sink;
-
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelException;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.flume.sink.AbstractSink;
-import org.apache.phoenix.flume.FlumeConstants;
-import org.apache.phoenix.flume.serializer.EventSerializer;
-import org.apache.phoenix.flume.serializer.EventSerializers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class PhoenixSink extends AbstractSink implements Configurable {
-    private static final Logger logger = 
LoggerFactory.getLogger(PhoenixSink.class);
-    private static AtomicInteger counter = new AtomicInteger();
-    private static final String NAME   = "Phoenix Sink__";
-  
-    private SinkCounter sinkCounter;
-    private Integer    batchSize;
-    private EventSerializer serializer;
- 
-    public PhoenixSink(){
-    }
-    
-    @Override
-    public void configure(Context context){
-        this.setName(NAME + counter.incrementAndGet());
-        this.batchSize = context.getInteger(FlumeConstants.CONFIG_BATCHSIZE, 
FlumeConstants.DEFAULT_BATCH_SIZE);
-        final String eventSerializerType = 
context.getString(FlumeConstants.CONFIG_SERIALIZER);
-        
-        if (eventSerializerType == null) {
-          throw new NullPointerException("Event serializer cannot be empty, 
please specify in the configuration file");
-        }
-        initializeSerializer(context,eventSerializerType);
-        this.sinkCounter = new SinkCounter(this.getName());
-    }
-
-    /**
-     * Initializes the serializer for flume events.
-     * @param eventSerializerType
-     */
-    private void initializeSerializer(final Context context,final String 
eventSerializerType) {
-        String serializerClazz = null;
-        EventSerializers eventSerializer = null;
-
-        try {
-            eventSerializer = 
EventSerializers.valueOf(eventSerializerType.toUpperCase());
-        } catch(IllegalArgumentException iae) {
-            serializerClazz = eventSerializerType;
-        }
-       
-       final Context serializerContext = new Context();
-       
serializerContext.putAll(context.getSubProperties(FlumeConstants.CONFIG_SERIALIZER_PREFIX));
-       copyPropertiesToSerializerContext(context,serializerContext);
-             
-       try {
-         @SuppressWarnings("unchecked")
-         Class<? extends EventSerializer> clazz = null;
-         if(serializerClazz == null) {
-             clazz = (Class<? extends EventSerializer>) 
Class.forName(eventSerializer.getClassName());
-         }
-         else {
-             clazz = (Class<? extends EventSerializer>) 
Class.forName(serializerClazz);
-         }
-
-         serializer = clazz.newInstance();
-         serializer.configure(serializerContext);
-         
-       } catch (Exception e) {
-         logger.error("Could not instantiate event serializer." , e);
-         if (e instanceof RuntimeException){
-           throw (RuntimeException) e;
-         }
-         else {
-           throw new RuntimeException(e);
-         }
-       }
-    }
-
-    private void copyPropertiesToSerializerContext(Context context, Context 
serializerContext) {
-        
-        
serializerContext.put(FlumeConstants.CONFIG_TABLE_DDL,context.getString(FlumeConstants.CONFIG_TABLE_DDL));
-        
serializerContext.put(FlumeConstants.CONFIG_TABLE,context.getString(FlumeConstants.CONFIG_TABLE));
-        
serializerContext.put(FlumeConstants.CONFIG_ZK_QUORUM,context.getString(FlumeConstants.CONFIG_ZK_QUORUM));
-        
serializerContext.put(FlumeConstants.CONFIG_JDBC_URL,context.getString(FlumeConstants.CONFIG_JDBC_URL));
-        
serializerContext.put(FlumeConstants.CONFIG_BATCHSIZE,context.getString(FlumeConstants.CONFIG_BATCHSIZE));
-  }
-
-    @Override
-    public void start() {
-        logger.info("Starting sink {} ",this.getName());
-        sinkCounter.start();
-        try {
-              serializer.initialize();
-              sinkCounter.incrementConnectionCreatedCount();
-        } catch(Exception ex) {
-            sinkCounter.incrementConnectionFailedCount();
-            logger.error("Error {} in initializing the 
serializer.",ex.getMessage());
-          if (ex instanceof RuntimeException){
-            throw (RuntimeException) ex;
-          }
-          else {
-            throw new RuntimeException(ex);
-          }
-       }
-       super.start();
-    }
-    
-    @Override
-    public void stop(){
-      super.stop();
-      try {
-          serializer.close();
-        } catch (SQLException e) {
-            logger.error(" Error while closing connection {} for sink {} 
",e.getMessage(),this.getName());
-        }
-      sinkCounter.incrementConnectionClosedCount();
-      sinkCounter.stop();
-    }
-
-    @Override
-    public Status process() throws EventDeliveryException {
-        
-        Status status = Status.READY;
-        Channel channel = getChannel();
-        Transaction transaction = null;
-        List<Event>  events = new ArrayList<>(this.batchSize);
-        long startTime = System.nanoTime();
-        try {
-            transaction = channel.getTransaction();
-            transaction.begin();
-            
-            for(long i = 0; i < this.batchSize; i++) {
-                Event event = channel.take();
-                if(event == null){
-                  status = Status.BACKOFF;
-                  if (i == 0) {
-                    sinkCounter.incrementBatchEmptyCount();
-                  } else {
-                    sinkCounter.incrementBatchUnderflowCount();
-                  }
-                  break;
-                } else {
-                  events.add(event);
-                }
-            }
-            if (!events.isEmpty()) {
-               if (events.size() == this.batchSize) {
-                    sinkCounter.incrementBatchCompleteCount();
-                }
-                else {
-                    sinkCounter.incrementBatchUnderflowCount();
-                    status = Status.BACKOFF;
-                }
-                // save to Hbase
-                serializer.upsertEvents(events);
-                sinkCounter.addToEventDrainSuccessCount(events.size());
-            }
-            else {
-                logger.debug("no events to process ");
-                sinkCounter.incrementBatchEmptyCount();
-                status = Status.BACKOFF;
-            }
-            transaction.commit();
-        } catch (ChannelException e) {
-            transaction.rollback();
-            status = Status.BACKOFF;
-            sinkCounter.incrementConnectionFailedCount();
-        }
-        catch (SQLException e) {
-            sinkCounter.incrementConnectionFailedCount();
-            transaction.rollback();
-            logger.error("exception while persisting to Hbase ", e);
-            throw new EventDeliveryException("Failed to persist message to 
Hbase", e);
-        }
-        catch (Throwable e) {
-            transaction.rollback();
-            logger.error("exception while processing in Phoenix Sink", e);
-            throw new EventDeliveryException("Failed to persist message", e);
-        }
-        finally {
-            logger.info(String.format("Time taken to process [%s] events was 
[%s] seconds",
-                    events.size(),
-                    TimeUnit.SECONDS.convert(System.nanoTime() - startTime, 
TimeUnit.NANOSECONDS)));
-            if( transaction != null ) {
-                transaction.close();
-            }
-        }
-        return status;
-   }
-
-}
diff --git a/pom.xml b/pom.xml
index b1a590a..461c073 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,6 @@
       <!-- Changing the module order here may cause maven to get stuck in an 
infinite loop -->
       <module>phoenix5-hive</module>
       <module>phoenix5-hive-shaded</module>
-      <module>phoenix5-flume</module>
       <module>phoenix5-spark</module>
       <module>phoenix5-spark-shaded</module>
       <module>phoenix5-spark3</module>
@@ -98,7 +97,6 @@
     <hive.version>${hive3.version}</hive.version>
     <hive3-storage.version>2.7.0</hive3-storage.version>
     <hive-storage.version>${hive3-storage.version}</hive-storage.version>
-    <flume.version>1.4.0</flume.version>
     <spark.version>2.4.0</spark.version>
     <spark3.version>3.0.3</spark3.version>
     <scala.version>2.11.12</scala.version>
@@ -506,11 +504,6 @@
         <artifactId>phoenix-shaded-guava</artifactId>
         <version>${phoenix.thirdparty.version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.apache.phoenix</groupId>
-        <artifactId>phoenix5-flume</artifactId>
-        <version>${project.version}</version>
-      </dependency>
       <dependency>
         <groupId>org.apache.phoenix</groupId>
         <artifactId>phoenix5-spark</artifactId>
@@ -815,39 +808,6 @@
         <artifactId>log4j</artifactId>
         <version>${log4j.version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.apache.flume</groupId>
-        <artifactId>flume-ng-core</artifactId>
-        <version>${flume.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>org.xerial.snappy</groupId>
-            <artifactId>snappy-java</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.flume</groupId>
-        <artifactId>flume-ng-sdk</artifactId>
-        <version>${flume.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>org.xerial.snappy</groupId>
-            <artifactId>snappy-java</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.flume</groupId>
-        <artifactId>flume-ng-configuration</artifactId>
-        <version>${flume.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>org.xerial.snappy</groupId>
-            <artifactId>snappy-java</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
       <dependency>
         <groupId>junit</groupId>
         <artifactId>junit</artifactId>


Reply via email to