Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.0 41b09861a -> 92d01e116


PHOENIX-2116: phoenix-flume - Sink/Serializer should be extendable

Removes 'final' from the PhoenixSink class declaration, and allows
passing a custom event serializer class instead of a built-in.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/92d01e11
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/92d01e11
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/92d01e11

Branch: refs/heads/4.x-HBase-1.0
Commit: 92d01e11622ef77a68d10747a587c83d1c8a5a31
Parents: 41b0986
Author: Josh Mahonin <jmaho...@interset.com>
Authored: Thu Sep 3 11:58:06 2015 -0400
Committer: Josh Mahonin <jmaho...@interset.com>
Committed: Thu Sep 3 12:02:13 2015 -0400

----------------------------------------------------------------------
 phoenix-flume/pom.xml                           |  4 +
 .../org/apache/phoenix/flume/PhoenixSinkIT.java | 99 +++++++++++++++++---
 .../flume/serializer/CustomSerializer.java      | 46 +++++++++
 .../phoenix/flume/sink/NullPhoenixSink.java     | 21 +++++
 .../apache/phoenix/flume/sink/PhoenixSink.java  | 24 +++--
 5 files changed, 171 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/92d01e11/phoenix-flume/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml
index 88741f5..098053d 100644
--- a/phoenix-flume/pom.xml
+++ b/phoenix-flume/pom.xml
@@ -57,6 +57,10 @@
       <artifactId>junit</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.github.stephenc.high-scale-lib</groupId>
       <artifactId>high-scale-lib</artifactId>
       <version>1.1.1</version>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92d01e11/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java 
b/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java
index 7d4f7af..a59a356 100644
--- a/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java
+++ b/phoenix-flume/src/it/java/org/apache/phoenix/flume/PhoenixSinkIT.java
@@ -17,25 +17,36 @@
  */
 package org.apache.phoenix.flume;
 
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Sink;
-import org.apache.flume.SinkFactory;
+import org.apache.flume.*;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.sink.DefaultSinkFactory;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.flume.serializer.EventSerializers;
+import org.apache.phoenix.flume.serializer.CustomSerializer;
+import org.apache.phoenix.flume.sink.NullPhoenixSink;
 import org.apache.phoenix.flume.sink.PhoenixSink;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Properties;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
 
 public class PhoenixSinkIT extends BaseHBaseManagedTimeIT {
 
@@ -79,7 +90,7 @@ public class PhoenixSinkIT extends BaseHBaseManagedTimeIT {
         Configurables.configure(sink, sinkContext);
     }
     
-    @Test(expected=IllegalArgumentException.class)
+    @Test(expected=RuntimeException.class)
     public void testInvalidConfigurationOfSerializer () {
         
         sinkContext = new Context ();
@@ -98,13 +109,13 @@ public class PhoenixSinkIT extends BaseHBaseManagedTimeIT {
         sinkContext = new Context ();
         sinkContext.put(FlumeConstants.CONFIG_TABLE, "flume_test");
         sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
-        
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
+        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, 
EventSerializers.REGEX.name());
+        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES, "col1,col2");
         sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
 
         sink = new PhoenixSink();
         Configurables.configure(sink, sinkContext);
-      
+
         final Channel channel = this.initChannel();
         sink.setChannel(channel);
         try {
@@ -131,14 +142,14 @@ public class PhoenixSinkIT extends BaseHBaseManagedTimeIT 
{
         sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
         sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
 
-        
+
         sink = new PhoenixSink();
         Configurables.configure(sink, sinkContext);
         Assert.assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-      
+
         final Channel channel = this.initChannel();
         sink.setChannel(channel);
-        
+
         sink.start();
         Assert.assertEquals(LifecycleState.START, sink.getLifecycleState());
         sink.stop();
@@ -160,13 +171,13 @@ public class PhoenixSinkIT extends BaseHBaseManagedTimeIT 
{
         sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
         sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_REGULAR_EXPRESSION,"^([^\t]+)\t([^\t]+)$");
         sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES,"col1,col2");
-        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.TIMESTAMP.name());
+        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, 
DefaultKeyGenerator.TIMESTAMP.name());
+
 
-        
         sink = new PhoenixSink();
         Configurables.configure(sink, sinkContext);
         Assert.assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
-      
+
         final Channel channel = this.initChannel();
         sink.setChannel(channel);
         
@@ -179,6 +190,66 @@ public class PhoenixSinkIT extends BaseHBaseManagedTimeIT {
             admin.close();
         }
     }
+
+    @Test
+    public void testExtendedSink() throws Exception {
+        // Create a mock NullPhoenixSink which extends PhoenixSink, and verify 
configure is invoked()
+
+        PhoenixSink sink = mock(NullPhoenixSink.class);
+        sinkContext = new Context();
+        sinkContext.put(FlumeConstants.CONFIG_TABLE, "FLUME_TEST_EXTENDED");
+        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
+        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, 
CustomSerializer.class.getName());
+        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES, "ID, COUNTS");
+        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, 
DefaultKeyGenerator.TIMESTAMP.name());
+
+        Configurables.configure(sink, sinkContext);
+        verify(sink).configure(sinkContext);
+    }
+
+    @Test
+    public void testExtendedSerializer() throws Exception {
+        /*
+        Sadly, we can't mock a serializer, as the PhoenixSink does a 
Class.forName() to instantiate
+        it. Instead. we'll setup a Flume channel and verify the data our 
custom serializer wrote.
+        */
+
+        final String fullTableName = "FLUME_TEST_EXTENDED";
+        final String ddl = "CREATE TABLE " + fullTableName + " (ID BIGINT NOT 
NULL PRIMARY KEY, COUNTS UNSIGNED_LONG)";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        final Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute(ddl);
+        conn.commit();
+
+        sinkContext = new Context();
+        sinkContext.put(FlumeConstants.CONFIG_TABLE, "FLUME_TEST_EXTENDED");
+        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
+        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, 
CustomSerializer.class.getName());
+        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_COLUMN_NAMES, "ID, COUNTS");
+        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + 
FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, 
DefaultKeyGenerator.TIMESTAMP.name());
+
+        PhoenixSink sink = new PhoenixSink();
+        Configurables.configure(sink, sinkContext);
+
+        // Send a test event through Flume, using our custom serializer
+        final Channel channel = this.initChannel();
+        sink.setChannel(channel);
+        sink.start();
+
+        final Transaction transaction = channel.getTransaction();
+        transaction.begin();
+        channel.put(EventBuilder.withBody(Bytes.toBytes("test event")));
+        transaction.commit();
+        transaction.close();
+
+        sink.process();
+        sink.stop();
+
+        // Verify our serializer wrote out data
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM 
FLUME_TEST_EXTENDED");
+        assertTrue(rs.next());
+        assertTrue(rs.getLong(1) == 1L);
+    }
     
     private Channel initChannel() {
         //Channel configuration

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92d01e11/phoenix-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java
 
b/phoenix-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java
new file mode 100644
index 0000000..a0785ae
--- /dev/null
+++ 
b/phoenix-flume/src/it/java/org/apache/phoenix/flume/serializer/CustomSerializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.flume.serializer;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.List;
+
+public class CustomSerializer extends BaseEventSerializer {
+    private static final Logger logger = 
LoggerFactory.getLogger(CustomSerializer.class);
+    @Override
+    public void doConfigure(Context context) {
+
+    }
+
+    @Override
+    public void doInitialize() throws SQLException {
+
+    }
+
+    @Override
+    public void upsertEvents(List<Event> events) throws SQLException {
+        // Just execute a sample UPSERT
+        connection.createStatement().execute("UPSERT INTO 
FLUME_TEST_EXTENDED(ID, COUNTS) VALUES(1, 1)");
+        connection.commit();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92d01e11/phoenix-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java 
b/phoenix-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java
new file mode 100644
index 0000000..1df52e1
--- /dev/null
+++ 
b/phoenix-flume/src/it/java/org/apache/phoenix/flume/sink/NullPhoenixSink.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.flume.sink;
+
+public class NullPhoenixSink extends PhoenixSink {
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/92d01e11/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java
----------------------------------------------------------------------
diff --git 
a/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java 
b/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java
index f9c929d..2b102a2 100644
--- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java
+++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/sink/PhoenixSink.java
@@ -42,7 +42,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 
-public final class PhoenixSink  extends AbstractSink implements Configurable {
+public class PhoenixSink extends AbstractSink implements Configurable {
     private static final Logger logger = 
LoggerFactory.getLogger(PhoenixSink.class);
     private static AtomicInteger counter = new AtomicInteger();
     private static final String NAME   = "Phoenix Sink__";
@@ -70,14 +70,13 @@ public final class PhoenixSink  extends AbstractSink 
implements Configurable {
      * @param eventSerializerType
      */
     private void initializeSerializer(final Context context,final String 
eventSerializerType) {
-        
-       EventSerializers eventSerializer = null;
-       try {
-               eventSerializer =  
EventSerializers.valueOf(eventSerializerType.toUpperCase());
+        String serializerClazz = null;
+        EventSerializers eventSerializer = null;
+
+        try {
+            eventSerializer = 
EventSerializers.valueOf(eventSerializerType.toUpperCase());
         } catch(IllegalArgumentException iae) {
-               logger.error("An invalid eventSerializer {} was passed. Please 
specify one of {} ",eventSerializerType,
-                       
Joiner.on(",").skipNulls().join(EventSerializers.values()));
-               Throwables.propagate(iae);
+            serializerClazz = eventSerializerType;
         }
        
        final Context serializerContext = new Context();
@@ -86,7 +85,14 @@ public final class PhoenixSink  extends AbstractSink 
implements Configurable {
              
        try {
          @SuppressWarnings("unchecked")
-         Class<? extends EventSerializer> clazz = (Class<? extends 
EventSerializer>) Class.forName(eventSerializer.getClassName());
+         Class<? extends EventSerializer> clazz = null;
+         if(serializerClazz == null) {
+             clazz = (Class<? extends EventSerializer>) 
Class.forName(eventSerializer.getClassName());
+         }
+         else {
+             clazz = (Class<? extends EventSerializer>) 
Class.forName(serializerClazz);
+         }
+
          serializer = clazz.newInstance();
          serializer.configure(serializerContext);
          

Reply via email to