adding colin review changes to span receiver

Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/a6ca3ccf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/a6ca3ccf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/a6ca3ccf

Branch: refs/heads/master
Commit: a6ca3ccf8007f41a112baf98328b9311b7148e7b
Parents: 5d38e87
Author: Nisala Nirmana <nisal...@gmail.com>
Authored: Mon Aug 8 02:55:35 2016 +0530
Committer: Nisala Nirmana <nisal...@gmail.com>
Committed: Mon Aug 8 02:55:35 2016 +0530

----------------------------------------------------------------------
 htrace-kudu/pom.xml                             |   51 -
 .../htrace/impl/KuduClientConfiguration.java    |   35 +-
 .../htrace/impl/KuduReceiverConstants.java      |   73 +-
 .../apache/htrace/impl/KuduSpanReceiver.java    |  171 +-
 .../htrace/protobuf/generated/SpanProtos.java   | 2241 ------------------
 htrace-kudu/src/main/protobuf/Span.proto        |   38 -
 .../htrace/impl/TestKuduSpanReceiver.java       |  144 +-
 7 files changed, 295 insertions(+), 2458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a6ca3ccf/htrace-kudu/pom.xml
----------------------------------------------------------------------
diff --git a/htrace-kudu/pom.xml b/htrace-kudu/pom.xml
index 58140a6..e977bd9 100644
--- a/htrace-kudu/pom.xml
+++ b/htrace-kudu/pom.xml
@@ -91,10 +91,6 @@ language governing permissions and limitations under the 
License. -->
                   <pattern>org.apache.commons.logging</pattern>
                   
<shadedPattern>org.apache.htrace.shaded.commons.logging</shadedPattern>
                 </relocation>
-                <relocation>
-                  <pattern>com.google.protobuf</pattern>
-                  
<shadedPattern>org.apache.htrace.shaded.com.google.protobuf</shadedPattern>
-                </relocation>
               </relocations>
             </configuration>
             <goals>
@@ -132,11 +128,6 @@ language governing permissions and limitations under the 
License. -->
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-      <version>2.5.0</version>
-    </dependency>
     <!-- Global deps. -->
     <dependency>
       <groupId>commons-logging</groupId>
@@ -169,48 +160,6 @@ language governing permissions and limitations under the 
License. -->
 
   <profiles>
     <profile>
-      <id>compile-protobuf</id>
-      <activation>
-        <property>
-          <name>compile-protobuf</name>
-        </property>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-maven-plugins</artifactId>
-            <version>2.7.1</version>
-            <configuration>
-              <protocVersion>${protobuf.version}</protocVersion>
-              <protocCommand>${protoc.path}</protocCommand>
-            </configuration>
-            <executions>
-              <execution>
-                <id>compile-protoc</id>
-                <phase>generate-sources</phase>
-                <goals>
-                  <goal>protoc</goal>
-                </goals>
-                <configuration>
-                  <imports>
-                    <param>${basedir}/src/main/protobuf</param>
-                  </imports>
-                  <source>
-                    <directory>${basedir}/src/main/protobuf</directory>
-                    <includes>
-                      <include>Span.proto</include>
-                    </includes>
-                  </source>
-                  <output>${basedir}/src/main/java/</output>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-    <profile>
       <id>dist</id>
       <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a6ca3ccf/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java 
b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
index c58dbb2..742f0ef 100644
--- 
a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
+++ 
b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
@@ -22,20 +22,35 @@ import org.kududb.client.KuduClient.KuduClientBuilder;
 
 public class KuduClientConfiguration {
 
-  private String host;
-  private String port;
-  private Integer workerCount;
-  private Integer bossCount;
-  private Boolean isStatisticsEnabled;
-  private Long adminOperationTimeout;
-  private Long operationTimeout;
-  private Long socketReadTimeout;
+  private final String host;
+  private final String port;
+  private final Integer workerCount;
+  private final Integer bossCount;
+  private final Boolean isStatisticsEnabled;
+  private final Long adminOperationTimeout;
+  private final Long operationTimeout;
+  private final Long socketReadTimeout;
+
+  public KuduClientConfiguration(String host,
+                                 String port,
+                                 Integer workerCount,
+                                 Integer bossCount,
+                                 Boolean isStatisticsEnabled,
+                                 Long adminOperationTimeout,
+                                 Long operationTimeout,
+                                 Long socketReadTimeout) {
 
-  public KuduClientConfiguration(String host, String port) {
     this.host = host;
     this.port = port;
+    this.workerCount = workerCount;
+    this.bossCount = bossCount;
+    this.isStatisticsEnabled = isStatisticsEnabled;
+    this.adminOperationTimeout = adminOperationTimeout;
+    this.operationTimeout = operationTimeout;
+    this.socketReadTimeout = socketReadTimeout;
   }
 
+  /*
   public void setWorkerCount(int workerCount) {
     this.workerCount = workerCount;
   }
@@ -60,6 +75,8 @@ public class KuduClientConfiguration {
     this.socketReadTimeout = socketReadTimeout;
   }
 
+  */
+
   public KuduClient buildClient() {
     KuduClientBuilder builder = new KuduClient
             .KuduClientBuilder(host.concat(":").concat(port));

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a6ca3ccf/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
----------------------------------------------------------------------
diff --git 
a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java 
b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
index a605dfe..be98311 100644
--- 
a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
+++ 
b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java
@@ -19,31 +19,52 @@ package org.apache.htrace.impl;
 
 public class KuduReceiverConstants {
 
-  public static final String KUDU_MASTER_HOST_KEY = "htrace.kudu.master.host";
-  public static final String DEFAULT_KUDU_MASTER_HOST = "127.0.0.1";
-  public static final String KUDU_MASTER_PORT_KEY = "htrace.kudu.master.port";
-  public static final String DEFAULT_KUDU_MASTER_PORT = "7051";
-  public static final String SPAN_BLOCKING_QUEUE_SIZE_KEY = 
"htrace.kudu.span.queue.size";
-  public static final int DEFAULT_SPAN_BLOCKING_QUEUE_SIZE = 1000;
-  public static final String KUDU_TABLE_KEY = "htrace.kudu.table";
-  public static final String DEFAULT_KUDU_TABLE = "htrace";
-  public static final String MAX_SPAN_BATCH_SIZE_KEY = 
"htrace.kudu.batch.size";
-  public static final int DEFAULT_MAX_SPAN_BATCH_SIZE = 100;
-  public static final String NUM_PARALLEL_THREADS_KEY = 
"htrace.kudu.num.threads";
-  public static final int DEFAULT_NUM_PARALLEL_THREADS = 1;
-  public static final String KUDU_COLUMN_SPAN_ID_KEY = 
"htrace.kudu.column.spanid";
-  public static final String DEFAULT_KUDU_COLUMN_SPAN_ID = "span_id";
-  public static final String KUDU_COLUMN_SPAN_KEY = "htrace.kudu.column.span";
-  public static final String DEFAULT_KUDU_COLUMN_SPAN = "span";
-  public static final String KUDU_COLUMN_ROOT_SPAN_KEY = 
"htrace.kudu.column.rootspan";
-  public static final String DEFAULT_KUDU_COLUMN_ROOT_SPAN = "root_span";
-  public static final String KUDU_COLUMN_ROOT_SPAN_START_TIME_KEY = 
"htrace.kudu.column.rootspan.starttime";
-  public static final String DEFAULT_KUDU_COLUMN_ROOT_SPAN_START_TIME = 
"root_span_start_time";
-  public static final String KUDU_CLIENT_WORKER_COUNT_KEY = 
"htrace.kudu.client.worker.count";
-  public static final String KUDU_CLIENT_BOSS_COUNT_KEY = 
"htrace.kudu.client.boss.count";
-  public static final String KUDU_CLIENT_STATISTICS_ENABLED_KEY = 
"htrace.kudu.client.statistics.enabled";
-  public static final String KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY = 
"htrace.kudu.client.timeout.admin.operation";
-  public static final String KUDU_CLIENT_TIMEOUT_OPERATION_KEY = 
"htrace.kudu.client.timeout.operation";
-  public static final String KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY = 
"htrace.kudu.client.timeout.socket.read";
+  static final String KUDU_MASTER_HOST_KEY = "kudu.master.host";
+  static final String DEFAULT_KUDU_MASTER_HOST = "127.0.0.1";
+  static final String KUDU_MASTER_PORT_KEY = "kudu.master.port";
+  static final String DEFAULT_KUDU_MASTER_PORT = "7051";
+  static final String SPAN_BLOCKING_QUEUE_SIZE_KEY = "kudu.span.queue.size";
+  static final int DEFAULT_SPAN_BLOCKING_QUEUE_SIZE = 1000;
+  static final String KUDU_SPAN_TABLE_KEY = "kudu.span.table";
+  static final String DEFAULT_KUDU_SPAN_TABLE = "span";
+  static final String KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY = 
"kudu.span.timeline.annotation.table";
+  static final String DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE = 
"span.timeline";
+  static final String MAX_SPAN_BATCH_SIZE_KEY = "kudu.batch.size";
+  static final int DEFAULT_MAX_SPAN_BATCH_SIZE = 100;
+  static final String NUM_PARALLEL_THREADS_KEY = "kudu.num.threads";
+  static final int DEFAULT_NUM_PARALLEL_THREADS = 1;
+  static final String KUDU_COLUMN_SPAN_TRACE_ID_KEY = 
"kudu.column.span.traceid";
+  static final String DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID = "trace_id";
+  static final String KUDU_COLUMN_SPAN_START_TIME_KEY = 
"kudu.column.span.starttime";
+  static final String DEFAULT_KUDU_COLUMN_SPAN_START_TIME = "start_time";
+  static final String KUDU_COLUMN_SPAN_STOP_TIME_KEY = 
"kudu.column.span.stoptime";
+  static final String DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME = "stop_time";
+  static final String KUDU_COLUMN_SPAN_SPAN_ID_KEY = "kudu.column.span.spanid";
+  static final String DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID = "span_id";
+  static final String KUDU_COLUMN_SPAN_PROCESS_ID_KEY = 
"kudu.column.span.processid";
+  static final String DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID = "process_id";
+  static final String KUDU_COLUMN_SPAN_PARENT_ID_KEY = 
"kudu.column.span.parentid";
+  static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID = "parent_id";
+  static final String KUDU_COLUMN_SPAN_DESCRIPTION_KEY = 
"kudu.column.span.description";
+  static final String DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION = "description";
+  static final String KUDU_COLUMN_SPAN_PARENT_KEY = "kudu.column.span.parent";
+  static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT = "parent";
+  static final String KUDU_COLUMN_TIMELINE_TIME_KEY = 
"kudu.column.timeline.time";
+  static final String DEFAULT_KUDU_COLUMN_TIMELINE_TIME = "time";
+  static final String KUDU_COLUMN_TIMELINE_MESSAGE_KEY = 
"kudu.column.timeline.message";
+  static final String DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE = "message";
+  static final String KUDU_COLUMN_TIMELINE_SPANID_KEY = 
"kudu.column.timeline.spanid";
+  static final String DEFAULT_KUDU_COLUMN_TIMELINE_SPANID = "spanid";
+  static final String KUDU_COLUMN_TIMELINE_TIMELINEID_KEY = 
"kudu.column.timeline.timelineid";
+  static final String DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID = "timelineid";
+  static final String KUDU_CLIENT_WORKER_COUNT_KEY = 
"kudu.client.worker.count";
+  static final String KUDU_CLIENT_BOSS_COUNT_KEY = "kudu.client.boss.count";
+  static final String KUDU_CLIENT_STATISTICS_ENABLED_KEY = 
"kudu.client.statistics.enabled";
+  static final String KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY = 
"kudu.client.timeout.admin.operation";
+  static final String KUDU_CLIENT_TIMEOUT_OPERATION_KEY = 
"kudu.client.timeout.operation";
+  static final String KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY = 
"kudu.client.timeout.socket.read";
 
 }
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a6ca3ccf/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
----------------------------------------------------------------------
diff --git 
a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java 
b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
index ed3e093..46c324a 100644
--- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
@@ -23,8 +23,6 @@ import org.apache.htrace.core.HTraceConfiguration;
 import org.apache.htrace.core.Span;
 import org.apache.htrace.core.SpanReceiver;
 import org.apache.htrace.core.TimelineAnnotation;
-import org.apache.htrace.protobuf.generated.SpanProtos;
-import org.kududb.client.Bytes;
 import org.kududb.client.KuduClient;
 import org.kududb.client.KuduSession;
 import org.kududb.client.KuduTable;
@@ -65,51 +63,113 @@ public class KuduSpanReceiver extends SpanReceiver {
       return thread;
     }
   };
+
   private ExecutorService service;
-  private String table;
-  private String column_span_id;
-  private String column_span;
-  private String column_root_span;
-  private String column_root_span_start_time;
+
+  private String table_span;
+  private String column_span_trace_id;
+  private String column_span_start_time;
+  private String column_span_stop_time;
+  private String column_span_span_id;
+  private String column_span_process_id;
+  private String column_span_parent_id;
+  private String column_span_description;
+  private String column_span_parent;
+
+  private String table_timeline;
+  private String column_timeline_timeline_id;
+  private String column_timeline_time;
+  private String column_timeline_message;
+  private String column_timeline_span_id;
 
   public KuduSpanReceiver(HTraceConfiguration conf) {
-    this.clientConf =
-            new 
KuduClientConfiguration(conf.get(KuduReceiverConstants.KUDU_MASTER_HOST_KEY,
-                    KuduReceiverConstants.DEFAULT_KUDU_MASTER_HOST),
-                    conf.get(KuduReceiverConstants.KUDU_MASTER_PORT_KEY,
-                            KuduReceiverConstants.DEFAULT_KUDU_MASTER_PORT));
+
+    String masterHost;
+    String masterPort;
+    Integer workerCount;
+    Integer bossCount;
+    Boolean isStatisticsEnabled;
+    Long adminOperationTimeout;
+    Long operationTimeout;
+    Long socketReadTimeout;
+
+    masterHost = conf.get(KuduReceiverConstants.KUDU_MASTER_HOST_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_MASTER_HOST);
+    masterPort = conf.get(KuduReceiverConstants.KUDU_MASTER_PORT_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_MASTER_PORT);
+
     if (conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY) != null) {
-      
this.clientConf.setBossCount(Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY)));
+      bossCount = 
Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY));
+    } else {
+      bossCount = null;
     }
     if (conf.get(KuduReceiverConstants.KUDU_CLIENT_WORKER_COUNT_KEY) != null) {
-      
this.clientConf.setWorkerCount(Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_WORKER_COUNT_KEY)));
+      workerCount = 
Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_WORKER_COUNT_KEY));
+    } else {
+      workerCount = null;
     }
     if (conf.get(KuduReceiverConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY) != 
null) {
-      
this.clientConf.setIsStatisticsEnabled(Boolean.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY)));
+      isStatisticsEnabled = 
Boolean.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY));
+    } else {
+      isStatisticsEnabled = null;
     }
     if 
(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY) != 
null) {
-      this.clientConf
-              
.setAdminOperationTimeout(Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY)));
+      adminOperationTimeout = 
Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY));
+    } else {
+      adminOperationTimeout = null;
     }
     if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY) != 
null) {
-      this.clientConf
-              
.setOperationTimeout(Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY)));
+      operationTimeout = 
Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY));
+    } else {
+      operationTimeout = null;
     }
     if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY) != 
null) {
-      this.clientConf
-              
.setSocketReadTimeout(Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY)));
+      socketReadTimeout = 
Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY));
+    } else {
+      socketReadTimeout = null;
     }
+
+    this.clientConf = new KuduClientConfiguration(masterHost,
+            masterPort,
+            workerCount,
+            bossCount,
+            isStatisticsEnabled,
+            adminOperationTimeout,
+            operationTimeout,
+            socketReadTimeout);
+
     this.queue = new 
ArrayBlockingQueue<Span>(conf.getInt(KuduReceiverConstants.SPAN_BLOCKING_QUEUE_SIZE_KEY,
             KuduReceiverConstants.DEFAULT_SPAN_BLOCKING_QUEUE_SIZE));
-    this.table = conf.get(KuduReceiverConstants.KUDU_TABLE_KEY, 
KuduReceiverConstants.DEFAULT_KUDU_TABLE);
-    this.column_span_id = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_ID_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_ID);
-    this.column_span = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN);
-    this.column_root_span = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_ROOT_SPAN_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_ROOT_SPAN);
-    this.column_root_span_start_time = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_ROOT_SPAN_START_TIME_KEY,
-            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_ROOT_SPAN_START_TIME);
+
+    this.table_span = conf.get(KuduReceiverConstants.KUDU_SPAN_TABLE_KEY, 
KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE);
+    this.table_timeline= 
conf.get(KuduReceiverConstants.KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE);
+
+    this.column_span_trace_id = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_TRACE_ID_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID);
+    this.column_span_start_time = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_START_TIME_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME);
+    this.column_span_stop_time = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_STOP_TIME_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME);
+    this.column_span_span_id = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_SPAN_ID_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID);
+    this.column_span_process_id = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PROCESS_ID_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID);
+    this.column_span_parent_id = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PARENT_ID_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID);
+    this.column_span_description = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_DESCRIPTION_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION);
+    this.column_span_parent = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PARENT_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT);
+    this.column_timeline_time = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_TIME_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME);
+    this.column_timeline_message = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_MESSAGE_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE);
+    this.column_timeline_span_id = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_SPANID_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID);
+    this.column_timeline_timeline_id = 
conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_TIMELINEID_KEY,
+            KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID);
+
     this.maxSpanBatchSize = 
conf.getInt(KuduReceiverConstants.MAX_SPAN_BATCH_SIZE_KEY,
             KuduReceiverConstants.DEFAULT_MAX_SPAN_BATCH_SIZE);
     if (this.service != null) {
@@ -159,9 +219,6 @@ public class KuduSpanReceiver extends SpanReceiver {
 
     @Override
     public void run() {
-      SpanProtos.Span.Builder sbuilder = SpanProtos.Span.newBuilder();
-      SpanProtos.TimelineAnnotation.Builder tlbuilder =
-              SpanProtos.TimelineAnnotation.newBuilder();
       List<Span> dequeuedSpans = new ArrayList<Span>(maxSpanBatchSize);
       long errorCount = 0;
       while (running.get() || queue.size() > 0) {
@@ -188,35 +245,35 @@ public class KuduSpanReceiver extends SpanReceiver {
         }
         try {
           for (Span span : dequeuedSpans) {
-            sbuilder.clear()
-                    .setTraceId(span.getSpanId().getHigh())
-                    .setStart(span.getStartTimeMillis())
-                    .setStop(span.getStopTimeMillis())
-                    .setSpanId(span.getSpanId().getLow())
-                    .setProcessId(span.getTracerId())
-                    .setDescription(span.getDescription());
-
+            KuduTable tableSpan = client.openTable(table_span);
+            Insert spanInsert = tableSpan.newInsert();
+            PartialRow spanRow = spanInsert.getRow();
+            spanRow.addLong(column_span_trace_id,span.getSpanId().getHigh());
+            spanRow.addLong(column_span_start_time,span.getStartTimeMillis());
+            spanRow.addLong(column_span_stop_time,span.getStopTimeMillis());
+            spanRow.addLong(column_span_span_id,span.getSpanId().getLow());
+            spanRow.addString(column_span_process_id,span.getTracerId());
             if (span.getParents().length == 0) {
-              sbuilder.setParentId(0);
+              spanRow.addLong(column_span_parent_id,0);
+              spanRow.addBoolean(column_span_parent,false);
             } else if (span.getParents().length > 0) {
-              sbuilder.setParentId(span.getParents()[0].getLow());
+              
spanRow.addLong(column_span_parent_id,span.getParents()[0].getLow());
+              spanRow.addBoolean(column_span_parent,true);
             }
+            spanRow.addString(column_span_description,span.getDescription());
+            this.session.apply(spanInsert);
+            long annotationCounter = 0;
             for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
-              sbuilder.addTimeline(tlbuilder.clear()
-                      .setTime(ta.getTime())
-                      .setMessage(ta.getMessage())
-                      .build());
-            }
-            KuduTable tableRef = client.openTable(table);
-            Insert insert = tableRef.newInsert();
-            PartialRow row = insert.getRow();
-            row.addBinary(column_span_id, 
Bytes.fromLong(span.getSpanId().getHigh()));
-            row.addBinary(column_span, sbuilder.build().toByteArray());
-            if (span.getParents().length == 0) {
-              row.addBinary(column_root_span_start_time, 
Bytes.fromLong(span.getStartTimeMillis()));
-              row.addBinary(column_root_span, sbuilder.build().toByteArray());
+              annotationCounter++;
+              KuduTable tableTimeline = client.openTable(table_timeline);
+              Insert timelineInsert = tableTimeline.newInsert();
+              PartialRow timelineRow = timelineInsert.getRow();
+              
timelineRow.addLong(column_timeline_timeline_id,span.getSpanId().getHigh()+annotationCounter);
+              timelineRow.addLong(column_timeline_time,ta.getTime());
+              timelineRow.addString(column_timeline_message,ta.getMessage());
+              
timelineRow.addLong(column_timeline_span_id,span.getSpanId().getHigh());
+              this.session.apply(timelineInsert);
             }
-            this.session.apply(insert);
           }
           dequeuedSpans.clear();
           errorCount = 0;

Reply via email to