adding colin review changes to span receiver
Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/a6ca3ccf Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/a6ca3ccf Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/a6ca3ccf Branch: refs/heads/master Commit: a6ca3ccf8007f41a112baf98328b9311b7148e7b Parents: 5d38e87 Author: Nisala Nirmana <[email protected]> Authored: Mon Aug 8 02:55:35 2016 +0530 Committer: Nisala Nirmana <[email protected]> Committed: Mon Aug 8 02:55:35 2016 +0530 ---------------------------------------------------------------------- htrace-kudu/pom.xml | 51 - .../htrace/impl/KuduClientConfiguration.java | 35 +- .../htrace/impl/KuduReceiverConstants.java | 73 +- .../apache/htrace/impl/KuduSpanReceiver.java | 171 +- .../htrace/protobuf/generated/SpanProtos.java | 2241 ------------------ htrace-kudu/src/main/protobuf/Span.proto | 38 - .../htrace/impl/TestKuduSpanReceiver.java | 144 +- 7 files changed, 295 insertions(+), 2458 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a6ca3ccf/htrace-kudu/pom.xml ---------------------------------------------------------------------- diff --git a/htrace-kudu/pom.xml b/htrace-kudu/pom.xml index 58140a6..e977bd9 100644 --- a/htrace-kudu/pom.xml +++ b/htrace-kudu/pom.xml @@ -91,10 +91,6 @@ language governing permissions and limitations under the License. --> <pattern>org.apache.commons.logging</pattern> <shadedPattern>org.apache.htrace.shaded.commons.logging</shadedPattern> </relocation> - <relocation> - <pattern>com.google.protobuf</pattern> - <shadedPattern>org.apache.htrace.shaded.com.google.protobuf</shadedPattern> - </relocation> </relocations> </configuration> <goals> @@ -132,11 +128,6 @@ language governing permissions and limitations under the License. --> <classifier>tests</classifier> <scope>test</scope> </dependency> - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <version>2.5.0</version> - </dependency> <!-- Global deps. --> <dependency> <groupId>commons-logging</groupId> @@ -169,48 +160,6 @@ language governing permissions and limitations under the License. --> <profiles> <profile> - <id>compile-protobuf</id> - <activation> - <property> - <name>compile-protobuf</name> - </property> - </activation> - <build> - <plugins> - <plugin> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-maven-plugins</artifactId> - <version>2.7.1</version> - <configuration> - <protocVersion>${protobuf.version}</protocVersion> - <protocCommand>${protoc.path}</protocCommand> - </configuration> - <executions> - <execution> - <id>compile-protoc</id> - <phase>generate-sources</phase> - <goals> - <goal>protoc</goal> - </goals> - <configuration> - <imports> - <param>${basedir}/src/main/protobuf</param> - </imports> - <source> - <directory>${basedir}/src/main/protobuf</directory> - <includes> - <include>Span.proto</include> - </includes> - </source> - <output>${basedir}/src/main/java/</output> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - <profile> <id>dist</id> <build> <plugins> http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a6ca3ccf/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java ---------------------------------------------------------------------- diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java index c58dbb2..742f0ef 100644 --- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java +++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduClientConfiguration.java @@ -22,20 +22,35 @@ import org.kududb.client.KuduClient.KuduClientBuilder; public class KuduClientConfiguration { - private String host; - private String port; - private Integer workerCount; - private Integer bossCount; - private Boolean isStatisticsEnabled; - private Long adminOperationTimeout; - private Long operationTimeout; - private Long socketReadTimeout; + private final String host; + private final String port; + private final Integer workerCount; + private final Integer bossCount; + private final Boolean isStatisticsEnabled; + private final Long adminOperationTimeout; + private final Long operationTimeout; + private final Long socketReadTimeout; + + public KuduClientConfiguration(String host, + String port, + Integer workerCount, + Integer bossCount, + Boolean isStatisticsEnabled, + Long adminOperationTimeout, + Long operationTimeout, + Long socketReadTimeout) { - public KuduClientConfiguration(String host, String port) { this.host = host; this.port = port; + this.workerCount = workerCount; + this.bossCount = bossCount; + this.isStatisticsEnabled = isStatisticsEnabled; + this.adminOperationTimeout = adminOperationTimeout; + this.operationTimeout = operationTimeout; + this.socketReadTimeout = socketReadTimeout; } + /* public void setWorkerCount(int workerCount) { this.workerCount = workerCount; } @@ -60,6 +75,8 @@ public class KuduClientConfiguration { this.socketReadTimeout = socketReadTimeout; } + */ + public KuduClient buildClient() { KuduClientBuilder builder = new KuduClient .KuduClientBuilder(host.concat(":").concat(port)); http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a6ca3ccf/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java ---------------------------------------------------------------------- diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java index a605dfe..be98311 100644 --- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java +++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduReceiverConstants.java @@ -19,31 +19,52 @@ package org.apache.htrace.impl; public class KuduReceiverConstants { - public static final String KUDU_MASTER_HOST_KEY = "htrace.kudu.master.host"; - public static final String DEFAULT_KUDU_MASTER_HOST = "127.0.0.1"; - public static final String KUDU_MASTER_PORT_KEY = "htrace.kudu.master.port"; - public static final String DEFAULT_KUDU_MASTER_PORT = "7051"; - public static final String SPAN_BLOCKING_QUEUE_SIZE_KEY = "htrace.kudu.span.queue.size"; - public static final int DEFAULT_SPAN_BLOCKING_QUEUE_SIZE = 1000; - public static final String KUDU_TABLE_KEY = "htrace.kudu.table"; - public static final String DEFAULT_KUDU_TABLE = "htrace"; - public static final String MAX_SPAN_BATCH_SIZE_KEY = "htrace.kudu.batch.size"; - public static final int DEFAULT_MAX_SPAN_BATCH_SIZE = 100; - public static final String NUM_PARALLEL_THREADS_KEY = "htrace.kudu.num.threads"; - public static final int DEFAULT_NUM_PARALLEL_THREADS = 1; - public static final String KUDU_COLUMN_SPAN_ID_KEY = "htrace.kudu.column.spanid"; - public static final String DEFAULT_KUDU_COLUMN_SPAN_ID = "span_id"; - public static final String KUDU_COLUMN_SPAN_KEY = "htrace.kudu.column.span"; - public static final String DEFAULT_KUDU_COLUMN_SPAN = "span"; - public static final String KUDU_COLUMN_ROOT_SPAN_KEY = "htrace.kudu.column.rootspan"; - public static final String DEFAULT_KUDU_COLUMN_ROOT_SPAN = "root_span"; - public static final String KUDU_COLUMN_ROOT_SPAN_START_TIME_KEY = "htrace.kudu.column.rootspan.starttime"; - public static final String DEFAULT_KUDU_COLUMN_ROOT_SPAN_START_TIME = "root_span_start_time"; - public static final String KUDU_CLIENT_WORKER_COUNT_KEY = "htrace.kudu.client.worker.count"; - public static final String KUDU_CLIENT_BOSS_COUNT_KEY = "htrace.kudu.client.boss.count"; - public static final String KUDU_CLIENT_STATISTICS_ENABLED_KEY = "htrace.kudu.client.statistics.enabled"; - public static final String KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY = "htrace.kudu.client.timeout.admin.operation"; - public static final String KUDU_CLIENT_TIMEOUT_OPERATION_KEY = "htrace.kudu.client.timeout.operation"; - public static final String KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY = "htrace.kudu.client.timeout.socket.read"; + static final String KUDU_MASTER_HOST_KEY = "kudu.master.host"; + static final String DEFAULT_KUDU_MASTER_HOST = "127.0.0.1"; + static final String KUDU_MASTER_PORT_KEY = "kudu.master.port"; + static final String DEFAULT_KUDU_MASTER_PORT = "7051"; + static final String SPAN_BLOCKING_QUEUE_SIZE_KEY = "kudu.span.queue.size"; + static final int DEFAULT_SPAN_BLOCKING_QUEUE_SIZE = 1000; + static final String KUDU_SPAN_TABLE_KEY = "kudu.span.table"; + static final String DEFAULT_KUDU_SPAN_TABLE = "span"; + static final String KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY = "kudu.span.timeline.annotation.table"; + static final String DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE = "span.timeline"; + static final String MAX_SPAN_BATCH_SIZE_KEY = "kudu.batch.size"; + static final int DEFAULT_MAX_SPAN_BATCH_SIZE = 100; + static final String NUM_PARALLEL_THREADS_KEY = "kudu.num.threads"; + static final int DEFAULT_NUM_PARALLEL_THREADS = 1; + static final String KUDU_COLUMN_SPAN_TRACE_ID_KEY = "kudu.column.span.traceid"; + static final String DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID = "trace_id"; + static final String KUDU_COLUMN_SPAN_START_TIME_KEY = "kudu.column.span.starttime"; + static final String DEFAULT_KUDU_COLUMN_SPAN_START_TIME = "start_time"; + static final String KUDU_COLUMN_SPAN_STOP_TIME_KEY = "kudu.column.span.stoptime"; + static final String DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME = "stop_time"; + static final String KUDU_COLUMN_SPAN_SPAN_ID_KEY = "kudu.column.span.spanid"; + static final String DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID = "span_id"; + static final String KUDU_COLUMN_SPAN_PROCESS_ID_KEY = "kudu.column.span.processid"; + static final String DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID = "process_id"; + static final String KUDU_COLUMN_SPAN_PARENT_ID_KEY = "kudu.column.span.parentid"; + static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID = "parent_id"; + static final String KUDU_COLUMN_SPAN_DESCRIPTION_KEY = "kudu.column.span.description"; + static final String DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION = "description"; + static final String KUDU_COLUMN_SPAN_PARENT_KEY = "kudu.column.span.parent"; + static final String DEFAULT_KUDU_COLUMN_SPAN_PARENT = "parent"; + static final String KUDU_COLUMN_TIMELINE_TIME_KEY = "kudu.column.timeline.time"; + static final String DEFAULT_KUDU_COLUMN_TIMELINE_TIME = "time"; + static final String KUDU_COLUMN_TIMELINE_MESSAGE_KEY = "kudu.column.timeline.message"; + static final String DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE = "message"; + static final String KUDU_COLUMN_TIMELINE_SPANID_KEY = "kudu.column.timeline.spanid"; + static final String DEFAULT_KUDU_COLUMN_TIMELINE_SPANID = "spanid"; + static final String KUDU_COLUMN_TIMELINE_TIMELINEID_KEY = "kudu.column.timeline.timelineid"; + static final String DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID = "timelineid"; + static final String KUDU_CLIENT_WORKER_COUNT_KEY = "kudu.client.worker.count"; + static final String KUDU_CLIENT_BOSS_COUNT_KEY = "kudu.client.boss.count"; + static final String KUDU_CLIENT_STATISTICS_ENABLED_KEY = "kudu.client.statistics.enabled"; + static final String KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY = "kudu.client.timeout.admin.operation"; + static final String KUDU_CLIENT_TIMEOUT_OPERATION_KEY = "kudu.client.timeout.operation"; + static final String KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY = "kudu.client.timeout.socket.read"; } + + + http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a6ca3ccf/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java index ed3e093..46c324a 100644 --- a/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java +++ b/htrace-kudu/src/main/java/org/apache/htrace/impl/KuduSpanReceiver.java @@ -23,8 +23,6 @@ import org.apache.htrace.core.HTraceConfiguration; import org.apache.htrace.core.Span; import org.apache.htrace.core.SpanReceiver; import org.apache.htrace.core.TimelineAnnotation; -import org.apache.htrace.protobuf.generated.SpanProtos; -import org.kududb.client.Bytes; import org.kududb.client.KuduClient; import org.kududb.client.KuduSession; import org.kududb.client.KuduTable; @@ -65,51 +63,113 @@ public class KuduSpanReceiver extends SpanReceiver { return thread; } }; + private ExecutorService service; - private String table; - private String column_span_id; - private String column_span; - private String column_root_span; - private String column_root_span_start_time; + + private String table_span; + private String column_span_trace_id; + private String column_span_start_time; + private String column_span_stop_time; + private String column_span_span_id; + private String column_span_process_id; + private String column_span_parent_id; + private String column_span_description; + private String column_span_parent; + + private String table_timeline; + private String column_timeline_timeline_id; + private String column_timeline_time; + private String column_timeline_message; + private String column_timeline_span_id; public KuduSpanReceiver(HTraceConfiguration conf) { - this.clientConf = - new KuduClientConfiguration(conf.get(KuduReceiverConstants.KUDU_MASTER_HOST_KEY, - KuduReceiverConstants.DEFAULT_KUDU_MASTER_HOST), - conf.get(KuduReceiverConstants.KUDU_MASTER_PORT_KEY, - KuduReceiverConstants.DEFAULT_KUDU_MASTER_PORT)); + + String masterHost; + String masterPort; + Integer workerCount; + Integer bossCount; + Boolean isStatisticsEnabled; + Long adminOperationTimeout; + Long operationTimeout; + Long socketReadTimeout; + + masterHost = conf.get(KuduReceiverConstants.KUDU_MASTER_HOST_KEY, + KuduReceiverConstants.DEFAULT_KUDU_MASTER_HOST); + masterPort = conf.get(KuduReceiverConstants.KUDU_MASTER_PORT_KEY, + KuduReceiverConstants.DEFAULT_KUDU_MASTER_PORT); + if (conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY) != null) { - this.clientConf.setBossCount(Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY))); + bossCount = Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_BOSS_COUNT_KEY)); + } else { + bossCount = null; } if (conf.get(KuduReceiverConstants.KUDU_CLIENT_WORKER_COUNT_KEY) != null) { - this.clientConf.setWorkerCount(Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_WORKER_COUNT_KEY))); + workerCount = Integer.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_WORKER_COUNT_KEY)); + } else { + workerCount = null; } if (conf.get(KuduReceiverConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY) != null) { - this.clientConf.setIsStatisticsEnabled(Boolean.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY))); + isStatisticsEnabled = Boolean.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_STATISTICS_ENABLED_KEY)); + } else { + isStatisticsEnabled = null; } if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY) != null) { - this.clientConf - .setAdminOperationTimeout(Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY))); + adminOperationTimeout = Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_ADMIN_OPERATION_KEY)); + } else { + adminOperationTimeout = null; } if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY) != null) { - this.clientConf - .setOperationTimeout(Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY))); + operationTimeout = Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_OPERATION_KEY)); + } else { + operationTimeout = null; } if (conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY) != null) { - this.clientConf - .setSocketReadTimeout(Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY))); + socketReadTimeout = Long.valueOf(conf.get(KuduReceiverConstants.KUDU_CLIENT_TIMEOUT_SOCKET_READ_KEY)); + } else { + socketReadTimeout = null; } + + this.clientConf = new KuduClientConfiguration(masterHost, + masterPort, + workerCount, + bossCount, + isStatisticsEnabled, + adminOperationTimeout, + operationTimeout, + socketReadTimeout); + this.queue = new ArrayBlockingQueue<Span>(conf.getInt(KuduReceiverConstants.SPAN_BLOCKING_QUEUE_SIZE_KEY, KuduReceiverConstants.DEFAULT_SPAN_BLOCKING_QUEUE_SIZE)); - this.table = conf.get(KuduReceiverConstants.KUDU_TABLE_KEY, KuduReceiverConstants.DEFAULT_KUDU_TABLE); - this.column_span_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_ID_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_ID); - this.column_span = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN); - this.column_root_span = conf.get(KuduReceiverConstants.KUDU_COLUMN_ROOT_SPAN_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_ROOT_SPAN); - this.column_root_span_start_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_ROOT_SPAN_START_TIME_KEY, - KuduReceiverConstants.DEFAULT_KUDU_COLUMN_ROOT_SPAN_START_TIME); + + this.table_span = conf.get(KuduReceiverConstants.KUDU_SPAN_TABLE_KEY, KuduReceiverConstants.DEFAULT_KUDU_SPAN_TABLE); + this.table_timeline= conf.get(KuduReceiverConstants.KUDU_SPAN_TIMELINE_ANNOTATION_TABLE_KEY, + KuduReceiverConstants.DEFAULT_KUDU_SPAN_TIMELINE_ANNOTATION_TABLE); + + this.column_span_trace_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_TRACE_ID_KEY, + KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_TRACE_ID); + this.column_span_start_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_START_TIME_KEY, + KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_START_TIME); + this.column_span_stop_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_STOP_TIME_KEY, + KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_STOP_TIME); + this.column_span_span_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_SPAN_ID_KEY, + KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_SPAN_ID); + this.column_span_process_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PROCESS_ID_KEY, + KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PROCESS_ID); + this.column_span_parent_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PARENT_ID_KEY, + KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT_ID); + this.column_span_description = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_DESCRIPTION_KEY, + KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_DESCRIPTION); + this.column_span_parent = conf.get(KuduReceiverConstants.KUDU_COLUMN_SPAN_PARENT_KEY, + KuduReceiverConstants.DEFAULT_KUDU_COLUMN_SPAN_PARENT); + this.column_timeline_time = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_TIME_KEY, + KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIME); + this.column_timeline_message = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_MESSAGE_KEY, + KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_MESSAGE); + this.column_timeline_span_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_SPANID_KEY, + KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_SPANID); + this.column_timeline_timeline_id = conf.get(KuduReceiverConstants.KUDU_COLUMN_TIMELINE_TIMELINEID_KEY, + KuduReceiverConstants.DEFAULT_KUDU_COLUMN_TIMELINE_TIMELINEID); + this.maxSpanBatchSize = conf.getInt(KuduReceiverConstants.MAX_SPAN_BATCH_SIZE_KEY, KuduReceiverConstants.DEFAULT_MAX_SPAN_BATCH_SIZE); if (this.service != null) { @@ -159,9 +219,6 @@ public class KuduSpanReceiver extends SpanReceiver { @Override public void run() { - SpanProtos.Span.Builder sbuilder = SpanProtos.Span.newBuilder(); - SpanProtos.TimelineAnnotation.Builder tlbuilder = - SpanProtos.TimelineAnnotation.newBuilder(); List<Span> dequeuedSpans = new ArrayList<Span>(maxSpanBatchSize); long errorCount = 0; while (running.get() || queue.size() > 0) { @@ -188,35 +245,35 @@ public class KuduSpanReceiver extends SpanReceiver { } try { for (Span span : dequeuedSpans) { - sbuilder.clear() - .setTraceId(span.getSpanId().getHigh()) - .setStart(span.getStartTimeMillis()) - .setStop(span.getStopTimeMillis()) - .setSpanId(span.getSpanId().getLow()) - .setProcessId(span.getTracerId()) - .setDescription(span.getDescription()); - + KuduTable tableSpan = client.openTable(table_span); + Insert spanInsert = tableSpan.newInsert(); + PartialRow spanRow = spanInsert.getRow(); + spanRow.addLong(column_span_trace_id,span.getSpanId().getHigh()); + spanRow.addLong(column_span_start_time,span.getStartTimeMillis()); + spanRow.addLong(column_span_stop_time,span.getStopTimeMillis()); + spanRow.addLong(column_span_span_id,span.getSpanId().getLow()); + spanRow.addString(column_span_process_id,span.getTracerId()); if (span.getParents().length == 0) { - sbuilder.setParentId(0); + spanRow.addLong(column_span_parent_id,0); + spanRow.addBoolean(column_span_parent,false); } else if (span.getParents().length > 0) { - sbuilder.setParentId(span.getParents()[0].getLow()); + spanRow.addLong(column_span_parent_id,span.getParents()[0].getLow()); + spanRow.addBoolean(column_span_parent,true); } + spanRow.addString(column_span_description,span.getDescription()); + this.session.apply(spanInsert); + long annotationCounter = 0; for (TimelineAnnotation ta : span.getTimelineAnnotations()) { - sbuilder.addTimeline(tlbuilder.clear() - .setTime(ta.getTime()) - .setMessage(ta.getMessage()) - .build()); - } - KuduTable tableRef = client.openTable(table); - Insert insert = tableRef.newInsert(); - PartialRow row = insert.getRow(); - row.addBinary(column_span_id, Bytes.fromLong(span.getSpanId().getHigh())); - row.addBinary(column_span, sbuilder.build().toByteArray()); - if (span.getParents().length == 0) { - row.addBinary(column_root_span_start_time, Bytes.fromLong(span.getStartTimeMillis())); - row.addBinary(column_root_span, sbuilder.build().toByteArray()); + annotationCounter++; + KuduTable tableTimeline = client.openTable(table_timeline); + Insert timelineInsert = tableTimeline.newInsert(); + PartialRow timelineRow = timelineInsert.getRow(); + timelineRow.addLong(column_timeline_timeline_id,span.getSpanId().getHigh()+annotationCounter); + timelineRow.addLong(column_timeline_time,ta.getTime()); + timelineRow.addString(column_timeline_message,ta.getMessage()); + timelineRow.addLong(column_timeline_span_id,span.getSpanId().getHigh()); + this.session.apply(timelineInsert); } - this.session.apply(insert); } dequeuedSpans.clear(); errorCount = 0;
