initial span receiver for kudu poc commit

Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/6752709e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/6752709e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/6752709e

Branch: refs/heads/master
Commit: 6752709e6d078465f55f88233ec3357328aa6d1e
Parents: 1a11d6c
Author: nisalanirmana <[email protected]>
Authored: Mon Jun 13 16:45:01 2016 +0530
Committer: nisalanirmana <[email protected]>
Committed: Mon Jun 13 16:45:01 2016 +0530

----------------------------------------------------------------------
 htrace-kudu/pom.xml                             |  229 ++
 .../apache/htrace/KuduHTraceConfiguration.java  |   52 +
 .../htrace/impl/KuduClientConfiguration.java    |   87 +
 .../org/apache/htrace/impl/KuduConstants.java   |   49 +
 .../apache/htrace/impl/KuduSpanReceiver.java    |  261 ++
 .../htrace/protobuf/generated/SpanProtos.java   | 2241 ++++++++++++++++++
 htrace-kudu/src/main/protobuf/Span.proto        |   38 +
 htrace-kudu/src/test/resources/log4j.properties |   25 +
 pom.xml                                         |    1 +
 9 files changed, 2983 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/6752709e/htrace-kudu/pom.xml
----------------------------------------------------------------------
diff --git a/htrace-kudu/pom.xml b/htrace-kudu/pom.xml
new file mode 100644
index 0000000..60db72d
--- /dev/null
+++ b/htrace-kudu/pom.xml
@@ -0,0 +1,229 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+license agreements. See the NOTICE file distributed with this work for 
additional
+information regarding copyright ownership. The ASF licenses this file to
+You under the Apache License, Version 2.0 (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of
+the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+by applicable law or agreed to in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+OF ANY KIND, either express or implied. See the License for the specific
+language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>htrace-kudu</artifactId>
+  <packaging>jar</packaging>
+
+  <parent>
+    <artifactId>htrace</artifactId>
+    <groupId>org.apache.htrace</groupId>
+    <version>4.1.0-incubating-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <name>htrace-kudu</name>
+  <description>
+    htrace-kudu is the tools to send tracing information
+    to an kudu database for analysis later.
+  </description>
+  <url>http://incubator.apache.org/projects/htrace.html</url>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <kudu.version>1.0.0-SNAPSHOT</kudu.version>
+    <hadoop.version>2.6.0-cdh5.4.7</hadoop.version>
+    <createDependencyReducedPom>true</createDependencyReducedPom>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>cdh.repo</id>
+      <name>Cloudera Repositories</name>
+      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>${basedir}/src/main</directory>
+        <includes>
+          <include>webapps/**</include>
+        </includes>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <configuration>
+          
<createDependencyReducedPom>${createDependencyReducedPom}</createDependencyReducedPom>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <configuration>
+              <relocations>
+                <relocation>
+                  <pattern>org.apache.commons.logging</pattern>
+                  
<shadedPattern>org.apache.htrace.shaded.commons.logging</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google.protobuf</pattern>
+                  
<shadedPattern>org.apache.htrace.shaded.com.google.protobuf</shadedPattern>
+                </relocation>
+              </relocations>
+            </configuration>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-javadoc-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-gpg-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <!-- explicitly define maven-deploy-plugin after other to force exec 
order -->
+        <artifactId>maven-deploy-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <!-- Module deps. -->
+    <dependency>
+      <groupId>org.apache.htrace</groupId>
+      <artifactId>htrace-core4</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.htrace</groupId>
+      <artifactId>htrace-core4</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>2.5.0</version>
+    </dependency>
+    <!-- Global deps. -->
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- KUDU specific deps. -->
+    <dependency>
+      <groupId>org.kududb</groupId>
+      <artifactId>kudu-client</artifactId>
+      <version>${kudu.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>compile-protobuf</id>
+      <activation>
+        <property>
+          <name>compile-protobuf</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-maven-plugins</artifactId>
+            <version>2.7.1</version>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+            </configuration>
+            <executions>
+              <execution>
+                <id>compile-protoc</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>protoc</goal>
+                </goals>
+                <configuration>
+                  <imports>
+                    <param>${basedir}/src/main/protobuf</param>
+                  </imports>
+                  <source>
+                    <directory>${basedir}/src/main/protobuf</directory>
+                    <includes>
+                      <include>Span.proto</include>
+                    </includes>
+                  </source>
+                  <output>${basedir}/src/main/java/</output>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>dist</id>
+      <build>
+        <plugins>
+          <plugin>
+            <!--Make it so assembly:single does nothing in here-->
+            <artifactId>maven-assembly-plugin</artifactId>
+            <configuration>
+              <skipAssembly>true</skipAssembly>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>doclint-disable</id>
+      <activation><jdk>[1.8,)</jdk></activation>
+      <properties>
+         <additionalparam>-Xdoclint:none</additionalparam>
+      </properties>
+    </profile>
+  </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/6752709e/htrace-kudu/src/main/java/org/apache/htrace/KuduHTraceConfiguration.java
----------------------------------------------------------------------
diff --git 
a/htrace-kudu/src/main/java/org/apache/htrace/KuduHTraceConfiguration.java 
b/htrace-kudu/src/main/java/org/apache/htrace/KuduHTraceConfiguration.java
new file mode 100644
index 0000000..4a5e740
--- /dev/null
+++ b/htrace-kudu/src/main/java/org/apache/htrace/KuduHTraceConfiguration.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.htrace.core.HTraceConfiguration;
+
+public class KuduHTraceConfiguration extends HTraceConfiguration {
+
+  public static final String KEY_PREFIX = "kudu.";
+  private final Configuration conf;
+
+  public KuduHTraceConfiguration(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public String get(String key) {
+    return conf.get(KEY_PREFIX + key);
+  }
+
+  @Override
+  public String get(String key, String defaultValue) {
+    return conf.get(KEY_PREFIX + key, defaultValue);
+
+  }
+
+  @Override
+  public boolean getBoolean(String key, boolean defaultValue) {
+    return conf.getBoolean(KEY_PREFIX + key, defaultValue);
+  }
+
+  @Override
+  public int getInt(String key, int defaultValue) {
+    return conf.getInt(KEY_PREFIX + key, defaultValue);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/6752709e/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java 
b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
new file mode 100644
index 0000000..4e99efc
--- /dev/null
+++ 
b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduClient.KuduClientBuilder;
+
+public class KuduClientConfiguration {
+
+  private String host;
+  private String port;
+  private int workerCount;
+  private int bossCount;
+  private boolean isStatisticsEnabled;
+  private long adminOperationTimeout;
+  private long operationTimeout;
+  private long socketReadTimeout;
+
+  public KuduClientConfiguration(String host, String port) {
+    this.host = host;
+    this.port = port;
+  }
+
+  public void setWorkerCount(int workerCount) {
+    this.workerCount = workerCount;
+  }
+
+  public void setBossCount(int bossCount) {
+    this.bossCount = bossCount;
+  }
+
+  public void setIsStatisticsEnabled(boolean isStatisticsEnabled) {
+    this.isStatisticsEnabled = isStatisticsEnabled;
+  }
+
+  public void setAdminOperationTimeout(long adminOperationTimeout) {
+    this.adminOperationTimeout = adminOperationTimeout;
+  }
+
+  public void setOperationTimeout(long operationTimeout) {
+    this.operationTimeout = operationTimeout;
+  }
+
+  public void setSocketReadTimeout(long socketReadTimeout) {
+    this.socketReadTimeout = socketReadTimeout;
+  }
+
+  public KuduClient buildClient() {
+    KuduClientBuilder builder = new KuduClient
+            .KuduClientBuilder(host.concat(":").concat(port));
+    if (Integer.valueOf(workerCount) != null) {
+      builder.workerCount(workerCount);
+    }
+    if (Integer.valueOf(bossCount) != null) {
+      builder.bossCount(bossCount);
+    }
+    if (!Boolean.valueOf(isStatisticsEnabled)) {
+      builder.disableStatistics();
+    }
+    if (Long.valueOf(adminOperationTimeout) != null) {
+      builder.defaultAdminOperationTimeoutMs(adminOperationTimeout);
+    }
+    if (Long.valueOf(operationTimeout) != null) {
+      builder.defaultOperationTimeoutMs(operationTimeout);
+    }
+    if (Long.valueOf(socketReadTimeout) != null) {
+      builder.defaultSocketReadTimeoutMs(socketReadTimeout);
+    }
+    return builder.build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/6752709e/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduConstants.java
----------------------------------------------------------------------
diff --git 
a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduConstants.java 
b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduConstants.java
new file mode 100644
index 0000000..4092b8b
--- /dev/null
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+public class KuduConstants {
+
+  public static final String KUDU_MASTER_HOST_KEY = "htrace.kudu.master.host";
+  public static final String DEFAULT_KUDU_MASTER_HOST = "127.0.0.1";
+  public static final String KUDU_MASTER_PORT_KEY = "htrace.kudu.master.port";
+  public static final String DEFAULT_KUDU_MASTER_PORT = "7051";
+  public static final String SPAN_BLOCKING_QUEUE_SIZE_KEY = 
"htrace.kudu.span.queue.size";
+  public static final int DEFAULT_SPAN_BLOCKING_QUEUE_SIZE = 1000;
+  public static final String KUDU_TABLE_KEY = "htrace.kudu.table";
+  public static final String DEFAULT_KUDU_TABLE = "htrace";
+  public static final String MAX_SPAN_BATCH_SIZE_KEY = 
"htrace.kudu.batch.size";
+  public static final int DEFAULT_MAX_SPAN_BATCH_SIZE = 100;
+  public static final String NUM_PARALLEL_THREADS_KEY = 
"htrace.kudu.num.threads";
+  public static final int DEFAULT_NUM_PARALLEL_THREADS = 1;
+  public static final String KUDU_COLUMN_SPAN_ID_KEY = 
"htrace.kudu.column.spanid";
+  public static final String DEFAULT_KUDU_COLUMN_SPAN_ID = "span_id";
+  public static final String KUDU_COLUMN_SPAN_KEY = "htrace.kudu.column.span";
+  public static final String DEFAULT_KUDU_COLUMN_SPAN = "span";
+  public static final String KUDU_COLUMN_ROOT_SPAN_KEY = 
"htrace.kudu.column.rootspan";
+  public static final String DEFAULT_KUDU_COLUMN_ROOT_SPAN = "root_span";
+  public static final String KUDU_COLUMN_ROOT_SPAN_START_TIME_KEY = 
"htrace.kudu.column.rootspan.starttime";
+  public static final String DEFAULT_KUDU_COLUMN_ROOT_SPAN_START_TIME = 
"root_span_start_time";
+  public static final String KUDU_CLIENT_WORKER_COUNT_KEY = 
"htrace.kudu.client.worker.count";
+  public static final String KUDU_CLIENT_BOSS_COUNT_KEY = 
"htrace.kudu.client.boss.count";
+  public static final String KUDU_CLIENT_STATISTICS_ENABLED_KEY = 
"htrace.kudu.client.statistics.enabled";
+  public static final String KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY = 
"htrace.kudu.client.timeout.admin.operation";
+  public static final String KUDU_CLIENT_TIMEOUT_OPERATION_KEY = 
"htrace.kudu.client.timeout.operation";
+  public static final String KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY = 
"htrace.kudu.client.timeout.socket.read";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/6752709e/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
----------------------------------------------------------------------
diff --git 
a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java 
b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
new file mode 100644
index 0000000..da7aca3
--- /dev/null
+++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.htrace.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.HTraceConfiguration;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanReceiver;
+import org.apache.htrace.core.TimelineAnnotation;
+import org.apache.htrace.protobuf.generated.SpanProtos;
+import org.kududb.client.Bytes;
+import org.kududb.client.KuduClient;
+import org.kududb.client.KuduSession;
+import org.kududb.client.KuduTable;
+import org.kududb.client.Insert;
+import org.kududb.client.PartialRow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class KuduSpanReceiver extends SpanReceiver {
+
+  private static final Log LOG = LogFactory.getLog(KuduSpanReceiver.class);
+
+  private static final int SHUTDOWN_TIMEOUT = 30;
+  private static final int MAX_ERRORS = 10;
+  private final BlockingQueue<Span> queue;
+  private final AtomicBoolean running = new AtomicBoolean(true);
+  private final KuduClientConfiguration clientConf;
+  private final int maxSpanBatchSize;
+  private final ThreadFactory threadFactory = new ThreadFactory() {
+    private final AtomicLong receiverIndex = new AtomicLong(0);
+
+    @Override
+    public Thread newThread(Runnable runnable) {
+      Thread thread = new Thread(runnable);
+      thread.setDaemon(true);
+      thread.setName(String.format("kuduSpanReceiver-%d",
+              receiverIndex.getAndIncrement()));
+      return thread;
+    }
+  };
+  private ExecutorService service;
+  private String table;
+  private String column_span_id;
+  private String column_span;
+  private String column_root_span;
+  private String column_root_span_start_time;
+
+  public KuduSpanReceiver(HTraceConfiguration conf) {
+    this.clientConf =
+            new 
KuduClientConfiguration(conf.get(KuduConstants.KUDU_MASTER_HOST_KEY,
+                    KuduConstants.DEFAULT_KUDU_MASTER_HOST),
+                    conf.get(KuduConstants.KUDU_MASTER_PORT_KEY,
+                            KuduConstants.DEFAULT_KUDU_MASTER_PORT));
+    
this.clientConf.setBossCount(conf.getInt(KuduConstants.KUDU_CLIENT_BOSS_COUNT_KEY,
+            Integer.valueOf(null)));
+    
this.clientConf.setWorkerCount(conf.getInt(KuduConstants.KUDU_CLIENT_WORKER_COUNT_KEY,
+            Integer.valueOf(null)));
+    
this.clientConf.setIsStatisticsEnabled(conf.getBoolean(KuduConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY,
+            Boolean.valueOf(null)));
+    this.clientConf
+            
.setAdminOperationTimeout(Long.valueOf(conf.get(KuduConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY,
+                    String.valueOf(null))));
+    this.clientConf
+            
.setOperationTimeout(Long.valueOf(conf.get(KuduConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY,
+                    String.valueOf(null))));
+    this.clientConf
+            
.setSocketReadTimeout(Long.valueOf(conf.get(KuduConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY,
+                    String.valueOf(null))));
+    this.queue = new 
ArrayBlockingQueue<Span>(conf.getInt(KuduConstants.SPAN_BLOCKING_QUEUE_SIZE_KEY,
+            KuduConstants.DEFAULT_SPAN_BLOCKING_QUEUE_SIZE));
+    this.table = conf.get(KuduConstants.KUDU_TABLE_KEY, 
KuduConstants.DEFAULT_KUDU_TABLE);
+    this.column_span_id = conf.get(KuduConstants.KUDU_COLUMN_SPAN_ID_KEY,
+            KuduConstants.DEFAULT_KUDU_COLUMN_SPAN_ID);
+    this.column_span = conf.get(KuduConstants.KUDU_COLUMN_SPAN_KEY,
+            KuduConstants.DEFAULT_KUDU_COLUMN_SPAN);
+    this.column_root_span = conf.get(KuduConstants.KUDU_COLUMN_ROOT_SPAN_KEY,
+            KuduConstants.DEFAULT_KUDU_COLUMN_ROOT_SPAN);
+    this.column_root_span_start_time = 
conf.get(KuduConstants.KUDU_COLUMN_ROOT_SPAN_START_TIME_KEY,
+            KuduConstants.DEFAULT_KUDU_COLUMN_ROOT_SPAN_START_TIME);
+    this.maxSpanBatchSize = conf.getInt(KuduConstants.MAX_SPAN_BATCH_SIZE_KEY,
+            KuduConstants.DEFAULT_MAX_SPAN_BATCH_SIZE);
+    if (this.service != null) {
+      this.service.shutdownNow();
+      this.service = null;
+    }
+    int numThreads = conf.getInt(KuduConstants.NUM_PARALLEL_THREADS_KEY,
+            KuduConstants.DEFAULT_NUM_PARALLEL_THREADS);
+    this.service = Executors.newFixedThreadPool(numThreads, threadFactory);
+    for (int i = 0; i < numThreads; i++) {
+      this.service.submit(new KuduSpanReceiver.WriteSpanRunnable());
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    running.set(false);
+    service.shutdown();
+    try {
+      if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
+        LOG.error("Timeout " + SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS +
+                " reached while shutting worker threads which process enqued 
spans." +
+                " Enqueued spans which are left in blocking queue is 
dropped.");
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted exception occured while terminating thread service 
executor.", e);
+    }
+  }
+
+  @Override
+  public void receiveSpan(Span span) {
+    if (running.get()) {
+      try {
+        this.queue.add(span);
+      } catch (IllegalStateException e) {
+        LOG.error("Error trying to enqueue span ("
+                + span.getDescription()
+                + ") to the queue. Blocking Queue is currently reached its 
capacity.");
+      }
+    }
+  }
+
+  private class WriteSpanRunnable implements Runnable {
+
+    private KuduSession session;
+    private KuduClient client;
+
+    @Override
+    public void run() {
+      SpanProtos.Span.Builder sbuilder = SpanProtos.Span.newBuilder();
+      SpanProtos.TimelineAnnotation.Builder tlbuilder =
+              SpanProtos.TimelineAnnotation.newBuilder();
+      List<Span> dequeuedSpans = new ArrayList<Span>(maxSpanBatchSize);
+      long errorCount = 0;
+      while (running.get() || queue.size() > 0) {
+        Span firstSpan = null;
+        try {
+          firstSpan = queue.poll(1, TimeUnit.SECONDS);
+          if (firstSpan != null) {
+            dequeuedSpans.add(firstSpan);
+            queue.drainTo(dequeuedSpans, maxSpanBatchSize - 1);
+          }
+        } catch (InterruptedException ie) {
+          LOG.error("Interrupted Exception occurred while polling to " +
+                  "retrieve first span from blocking queue");
+        }
+        startSession();
+        if (dequeuedSpans.isEmpty()) {
+          try {
+            this.session.flush();
+          } catch (java.lang.Exception e) {
+            LOG.error("Failed to flush writes to Kudu.");
+            closeSession();
+          }
+          continue;
+        }
+        try {
+          for (Span span : dequeuedSpans) {
+            sbuilder.clear()
+                    .setTraceId(span.getSpanId().getHigh())
+                    .setStart(span.getStartTimeMillis())
+                    .setStop(span.getStopTimeMillis())
+                    .setSpanId(span.getSpanId().getLow())
+                    .setProcessId(span.getTracerId())
+                    .setDescription(span.getDescription());
+
+            if (span.getParents().length == 0) {
+              sbuilder.setParentId(0);
+            } else if (span.getParents().length > 0) {
+              sbuilder.setParentId(span.getParents()[0].getLow());
+            }
+            for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
+              sbuilder.addTimeline(tlbuilder.clear()
+                      .setTime(ta.getTime())
+                      .setMessage(ta.getMessage())
+                      .build());
+            }
+            KuduTable tableRef = client.openTable(table);
+            Insert insert = tableRef.newInsert();
+            PartialRow row = insert.getRow();
+            row.addBinary(column_span_id, 
Bytes.fromLong(span.getSpanId().getHigh()));
+            row.addBinary(column_span, sbuilder.build().toByteArray());
+            if (span.getParents().length == 0) {
+              row.addBinary(column_root_span_start_time, 
Bytes.fromLong(span.getStartTimeMillis()));
+              row.addBinary(column_root_span, sbuilder.build().toByteArray());
+            }
+            this.session.apply(insert);
+          }
+          dequeuedSpans.clear();
+          errorCount = 0;
+        } catch (Exception e) {
+          errorCount += 1;
+          if (errorCount < MAX_ERRORS) {
+            try {
+              queue.addAll(dequeuedSpans);
+            } catch (IllegalStateException ex) {
+              LOG.error("Exception occured while writing spans kudu datastore. 
" +
+                      "Trying to re-enqueue de-queued spans to blocking queue 
for writing but failed. " +
+                      "Dropped " + dequeuedSpans.size() + " dequeued span(s) 
which were due written" +
+                      "into kudu datastore");
+            }
+          }
+          closeSession();
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e1) {
+            LOG.error("Interrupted Exception occurred while allowing kudu to 
re-stabilized");
+          }
+        }
+      }
+      closeSession();
+    }
+
+    private void closeSession() {
+      try {
+        if (this.session != null) {
+          this.session.close();
+          this.session = null;
+        }
+      } catch (java.lang.Exception e) {
+        LOG.warn("Failed to close Kudu session. " + e.getMessage());
+      }
+    }
+
+    private void startSession() {
+      if (this.session == null) {
+        if (this.client == null) {
+          client = clientConf.buildClient();
+        }
+        session = client.newSession();
+      }
+    }
+  }
+
+}

Reply via email to