This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5c842e7d37d add thrift inputformat implementation (#19111)
5c842e7d37d is described below
commit 5c842e7d37d463982211e826f19aa74ea7e22885
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Mar 10 14:42:24 2026 -0700
add thrift inputformat implementation (#19111)
---
docs/development/extensions-contrib/thrift.md | 116 ++++-
embedded-tests/pom.xml | 19 +-
.../indexing/StreamIndexDataFormatsTestBase.java | 29 ++
.../embedded/tools/ThriftEventSerializer.java | 111 +++++
.../embedded/tools/WikipediaThriftEvent.java | 505 +++++++++++++++++++++
extensions-contrib/thrift-extensions/pom.xml | 15 +
.../data/input/thrift/ThriftExtensionsModule.java | 3 +-
.../druid/data/input/thrift/ThriftInputFormat.java | 108 +++++
.../druid/data/input/thrift/ThriftReader.java | 148 ++++++
.../data/input/thrift/ThriftInputFormatTest.java | 167 +++++++
website/.spelling | 1 +
11 files changed, 1205 insertions(+), 17 deletions(-)
diff --git a/docs/development/extensions-contrib/thrift.md
b/docs/development/extensions-contrib/thrift.md
index 31489827090..8e15c154b72 100644
--- a/docs/development/extensions-contrib/thrift.md
+++ b/docs/development/extensions-contrib/thrift.md
@@ -25,27 +25,114 @@ title: "Thrift"
To use this Apache Druid extension,
[include](../../configuration/extensions.md#loading-extensions)
`druid-thrift-extensions` in the extensions load list.
-This extension enables Druid to ingest thrift compact data online
(`ByteBuffer`) and offline (SequenceFile of type `<Writable, BytesWritable>` or
LzoThriftBlock File).
+This extension enables Druid to ingest Thrift-encoded data from streaming
sources such as Kafka and Kinesis, as well as from Hadoop batch jobs reading
SequenceFile or LzoThriftBlock files. The binary, compact, and JSON Thrift wire
protocols are all supported, with optional Base64 encoding.
You may want to use another version of thrift, change the dependency in pom
and compile yourself.
+## Thrift input format
+
+Thrift-encoded data for streaming ingestion (Kafka, Kinesis) can be ingested
using the Thrift [input format](../../ingestion/data-formats.md#input-format).
It supports `flattenSpec` for extracting fields from nested Thrift structs
using JSONPath expressions.
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | Must be `thrift` | yes |
+| thriftClass | String | Fully qualified class name of the Thrift-generated
`TBase` class to deserialize into. | yes |
+| thriftJar | String | Path to a JAR file containing the Thrift class. If not
provided, the class is looked up from the classpath. | no |
+| flattenSpec | JSON Object | Specifies flattening of nested Thrift structs.
See [Flattening nested data](../../ingestion/data-formats.md#flattenspec) for
details. | no |
+
+### Example: Kafka ingestion
+
+Consider the following Thrift schema definition:
+
+```
+namespace java com.example.druid
+
+struct Author {
+ 1: string firstName;
+ 2: string lastName;
+}
+
+struct Book {
+ 1: string date;
+ 2: double price;
+ 3: string title;
+ 4: Author author;
+}
+```
+
+Compile it to produce `com.example.druid.Book` (and
`com.example.druid.Author`) and make the resulting JAR available on the
classpath of your Druid processes, or reference it via `thriftJar`.
+
+The following Kafka supervisor spec ingests compact-encoded `Book` messages,
using a `flattenSpec` to extract the nested `author.lastName` field:
+
+```json
+{
+ "type": "kafka",
+ "spec": {
+ "dataSchema": {
+ "dataSource": "books",
+ "timestampSpec": {
+ "column": "date",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ "title",
+ "lastName"
+ ]
+ },
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "DAY",
+ "queryGranularity": "NONE"
+ }
+ },
+ "tuningConfig": {
+ "type": "kafka"
+ },
+ "ioConfig": {
+ "type": "kafka",
+ "consumerProperties": {
+ "bootstrap.servers": "localhost:9092"
+ },
+ "topic": "books",
+ "inputFormat": {
+ "type": "thrift",
+ "thriftClass": "com.example.druid.Book",
+ "flattenSpec": {
+ "useFieldDiscovery": true,
+ "fields": [
+ {
+ "type": "path",
+ "name": "lastName",
+ "expr": "$.author.lastName"
+ }
+ ]
+ }
+ },
+ "taskCount": 1,
+ "replicas": 1,
+ "taskDuration": "PT1H"
+ }
+ }
+}
+```
+
## LZO Support
If you plan to read LZO-compressed Thrift files, you will need to download
version 0.4.19 of the [hadoop-lzo
JAR](https://mvnrepository.com/artifact/com.hadoop.gplcompression/hadoop-lzo/0.4.19)
and place it in your `extensions/druid-thrift-extensions` directory.
-## Thrift Parser
-
+## Thrift parser (deprecated)
-| Field | Type | Description |
Required |
-| ----------- | ----------- | ---------------------------------------- |
-------- |
-| type | String | This should say `thrift` | yes
|
-| parseSpec | JSON Object | Specifies the timestamp and dimensions of the
data. Should be a JSON parseSpec. | yes |
-| thriftJar | String | path of thrift jar, if not provided, it will try
to find the thrift class in classpath. Thrift jar in batch ingestion should be
uploaded to HDFS first and configure `jobProperties` with
`"tmpjars":"/path/to/your/thrift.jar"` | no |
-| thriftClass | String | classname of thrift | yes
|
+`ThriftInputRowParser` is the legacy parser-based approach to Thrift
ingestion. It is deprecated in favor of `ThriftInputFormat` above and will be
removed in a future release.
-- Batch Ingestion example - `inputFormat` and `tmpjars` should be set.
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| type | String | Must be `thrift` | yes |
+| parseSpec | JSON Object | Specifies the timestamp and dimensions of the
data. Should be a JSON parseSpec. | yes |
+| thriftJar | String | Path to the Thrift JAR. If not provided, the class is
looked up from the classpath. For Hadoop batch ingestion the JAR should be
uploaded to HDFS first and `jobProperties` configured with
`"tmpjars":"/path/to/your/thrift.jar"`. | no |
+| thriftClass | String | Fully qualified class name of the Thrift-generated
class. | yes |
-This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of
inputSpec in ioConfig could be one of
`"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"` and
`com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat`. Be
careful, when `LzoThriftBlockInputFormat` is used, thrift class must be
provided twice.
+Batch ingestion example using the HadoopDruidIndexer. The `inputFormat` of
`inputSpec` in `ioConfig` can be either
`"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"` or
`"com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat"`. When
using `LzoThriftBlockInputFormat`, the Thrift class must be provided twice.
```json
{
@@ -60,7 +147,8 @@ This is for batch ingestion using the HadoopDruidIndexer.
The inputFormat of inp
"protocol": "compact",
"parseSpec": {
"format": "json",
- ...
+ "timestampSpec": {},
+ "dimensionsSpec": {}
}
},
"metricsSpec": [],
@@ -71,15 +159,13 @@ This is for batch ingestion using the HadoopDruidIndexer.
The inputFormat of inp
"inputSpec": {
"type": "static",
"inputFormat":
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
- // "inputFormat":
"com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat",
"paths": "/user/to/some/book.seq"
}
},
"tuningConfig": {
"type": "hadoop",
"jobProperties": {
- "tmpjars":"/user/h_user_profile/du00/druid/test/book.jar",
- // "elephantbird.class.for.MultiInputFormat" :
"${YOUR_THRIFT_CLASS_NAME}"
+ "tmpjars": "/user/h_user_profile/du00/druid/test/book.jar"
}
}
}
diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml
index 75e33d0cae0..5e3452bf9e4 100644
--- a/embedded-tests/pom.xml
+++ b/embedded-tests/pom.xml
@@ -137,6 +137,18 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.druid.extensions.contrib</groupId>
+ <artifactId>druid-thrift-extensions</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>joda-time</groupId>
@@ -590,7 +602,12 @@
<version>5.5</version>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.13.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexDataFormatsTestBase.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexDataFormatsTestBase.java
index 18d3f512374..031632f5989 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexDataFormatsTestBase.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexDataFormatsTestBase.java
@@ -42,6 +42,8 @@ import
org.apache.druid.data.input.protobuf.ProtobufExtensionsModule;
import org.apache.druid.data.input.protobuf.ProtobufInputFormat;
import org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
import
org.apache.druid.data.input.protobuf.SchemaRegistryBasedProtobufBytesDecoder;
+import org.apache.druid.data.input.thrift.ThriftExtensionsModule;
+import org.apache.druid.data.input.thrift.ThriftInputFormat;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.DruidMetrics;
@@ -54,6 +56,8 @@ import org.apache.druid.testing.embedded.EmbeddedIndexer;
import org.apache.druid.testing.embedded.EmbeddedOverlord;
import org.apache.druid.testing.embedded.StreamIngestResource;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.embedded.tools.ThriftEventSerializer;
+import org.apache.druid.testing.embedded.tools.WikipediaThriftEvent;
import org.apache.druid.testing.tools.AvroEventSerializer;
import org.apache.druid.testing.tools.AvroSchemaRegistryEventSerializer;
import org.apache.druid.testing.tools.CsvEventSerializer;
@@ -81,6 +85,8 @@ import java.util.Map;
* <li>CSV</li>
* <li>JSON</li>
* <li>Protobuf (with and without schema registry)</li>
+ * <li>Thrift</li>
+ * <li>TSV</li>
* </ul>
*
* This tests both InputFormat and Parser. Parser is deprecated for Streaming
Ingestion,
@@ -138,6 +144,7 @@ public abstract class StreamIndexDataFormatsTestBase
extends EmbeddedClusterTest
coordinator.addProperty("druid.manager.segments.useIncrementalCache",
"ifSynced");
cluster.addExtension(ProtobufExtensionsModule.class)
.addExtension(AvroExtensionsModule.class)
+ .addExtension(ThriftExtensionsModule.class)
.useLatchableEmitter()
.addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
.addResource(streamResource)
@@ -543,6 +550,28 @@ public abstract class StreamIndexDataFormatsTestBase
extends EmbeddedClusterTest
stopSupervisor(supervisorSpec);
}
+ @Test
+ @Timeout(30)
+ public void test_thriftDataFormat()
+ {
+ streamResource.createTopicWithPartitions(dataSource, 3);
+ EventSerializer serializer = new ThriftEventSerializer();
+ int recordCount = generateStreamAndPublish(dataSource, serializer, false);
+
+ ThriftInputFormat inputFormat = new ThriftInputFormat(
+ new JSONPathSpec(true, null),
+ null,
+ WikipediaThriftEvent.class.getName()
+ );
+
+ SupervisorSpec supervisorSpec = createSupervisor(dataSource, dataSource,
inputFormat);
+ final String supervisorId =
cluster.callApi().postSupervisor(supervisorSpec);
+ Assertions.assertEquals(dataSource, supervisorId);
+
+ waitForDataAndVerifyIngestedEvents(dataSource, recordCount);
+ stopSupervisor(supervisorSpec);
+ }
+
private void waitForDataAndVerifyIngestedEvents(String dataSource, int
expectedCount)
{
// Wait for the task to succeed
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/tools/ThriftEventSerializer.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/tools/ThriftEventSerializer.java
new file mode 100644
index 00000000000..5f5767c5192
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/tools/ThriftEventSerializer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.tools;
+
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.testing.tools.EventSerializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
+
+import java.util.List;
+
+/**
+ * {@link EventSerializer} that serializes Wikipedia stream events as Thrift
compact-encoded
+ * {@link WikipediaThriftEvent} objects. All field values are converted to
strings so that
+ * integer values like {@code added}, {@code deleted}, and {@code delta} are
serialized
+ * uniformly without requiring a mixed-type Thrift struct.
+ */
+public class ThriftEventSerializer implements EventSerializer
+{
+ private static final TSerializer SERIALIZER = new TSerializer(new
TCompactProtocol.Factory());
+
+ @Override
+ public byte[] serialize(List<Pair<String, Object>> event)
+ {
+ WikipediaThriftEvent wikiEvent = new WikipediaThriftEvent();
+ for (Pair<String, Object> pair : event) {
+ String value = pair.rhs == null ? null : String.valueOf(pair.rhs);
+ switch (pair.lhs) {
+ case "timestamp":
+ wikiEvent.timestamp = value;
+ break;
+ case "page":
+ wikiEvent.page = value;
+ break;
+ case "language":
+ wikiEvent.language = value;
+ break;
+ case "user":
+ wikiEvent.user = value;
+ break;
+ case "unpatrolled":
+ wikiEvent.unpatrolled = value;
+ break;
+ case "newPage":
+ wikiEvent.newPage = value;
+ break;
+ case "robot":
+ wikiEvent.robot = value;
+ break;
+ case "anonymous":
+ wikiEvent.anonymous = value;
+ break;
+ case "namespace":
+ wikiEvent.namespace = value;
+ break;
+ case "continent":
+ wikiEvent.continent = value;
+ break;
+ case "country":
+ wikiEvent.country = value;
+ break;
+ case "region":
+ wikiEvent.region = value;
+ break;
+ case "city":
+ wikiEvent.city = value;
+ break;
+ case "added":
+ wikiEvent.added = value;
+ break;
+ case "deleted":
+ wikiEvent.deleted = value;
+ break;
+ case "delta":
+ wikiEvent.delta = value;
+ break;
+ default:
+ break;
+ }
+ }
+ try {
+ return SERIALIZER.serialize(wikiEvent);
+ }
+ catch (TException e) {
+ throw new RuntimeException("Failed to serialize WikipediaThriftEvent",
e);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/tools/WikipediaThriftEvent.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/tools/WikipediaThriftEvent.java
new file mode 100644
index 00000000000..c2aabf98d65
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/tools/WikipediaThriftEvent.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.tools;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.protocol.TField;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolUtil;
+import org.apache.thrift.protocol.TStruct;
+import org.apache.thrift.protocol.TType;
+
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A hand-written Thrift struct representing a Wikipedia stream event for use
in
+ * embedded stream ingestion tests. All fields are strings so that
+ * {@code TSimpleJSONProtocol} can serialize them without special handling.
+ */
+public class WikipediaThriftEvent
+ implements TBase<WikipediaThriftEvent, WikipediaThriftEvent._Fields>,
java.io.Serializable
+{
+ private static final long serialVersionUID = 1L;
+
+ private static final TStruct STRUCT_DESC = new
TStruct("WikipediaThriftEvent");
+
+ private static final TField TIMESTAMP_FIELD_DESC = new TField("timestamp",
TType.STRING, (short) 1);
+ private static final TField PAGE_FIELD_DESC = new TField("page",
TType.STRING, (short) 2);
+ private static final TField LANGUAGE_FIELD_DESC = new TField("language",
TType.STRING, (short) 3);
+ private static final TField USER_FIELD_DESC = new TField("user",
TType.STRING, (short) 4);
+ private static final TField UNPATROLLED_FIELD_DESC = new
TField("unpatrolled", TType.STRING, (short) 5);
+ private static final TField NEW_PAGE_FIELD_DESC = new TField("newPage",
TType.STRING, (short) 6);
+ private static final TField ROBOT_FIELD_DESC = new TField("robot",
TType.STRING, (short) 7);
+ private static final TField ANONYMOUS_FIELD_DESC = new TField("anonymous",
TType.STRING, (short) 8);
+ private static final TField NAMESPACE_FIELD_DESC = new TField("namespace",
TType.STRING, (short) 9);
+ private static final TField CONTINENT_FIELD_DESC = new TField("continent",
TType.STRING, (short) 10);
+ private static final TField COUNTRY_FIELD_DESC = new TField("country",
TType.STRING, (short) 11);
+ private static final TField REGION_FIELD_DESC = new TField("region",
TType.STRING, (short) 12);
+ private static final TField CITY_FIELD_DESC = new TField("city",
TType.STRING, (short) 13);
+ private static final TField ADDED_FIELD_DESC = new TField("added",
TType.STRING, (short) 14);
+ private static final TField DELETED_FIELD_DESC = new TField("deleted",
TType.STRING, (short) 15);
+ private static final TField DELTA_FIELD_DESC = new TField("delta",
TType.STRING, (short) 16);
+
+ public String timestamp;
+ public String page;
+ public String language;
+ public String user;
+ public String unpatrolled;
+ public String newPage;
+ public String robot;
+ public String anonymous;
+ public String namespace;
+ public String continent;
+ public String country;
+ public String region;
+ public String city;
+ public String added;
+ public String deleted;
+ public String delta;
+
+ public enum _Fields implements TFieldIdEnum
+ {
+ TIMESTAMP((short) 1, "timestamp"),
+ PAGE((short) 2, "page"),
+ LANGUAGE((short) 3, "language"),
+ USER((short) 4, "user"),
+ UNPATROLLED((short) 5, "unpatrolled"),
+ NEW_PAGE((short) 6, "newPage"),
+ ROBOT((short) 7, "robot"),
+ ANONYMOUS((short) 8, "anonymous"),
+ NAMESPACE((short) 9, "namespace"),
+ CONTINENT((short) 10, "continent"),
+ COUNTRY((short) 11, "country"),
+ REGION((short) 12, "region"),
+ CITY((short) 13, "city"),
+ ADDED((short) 14, "added"),
+ DELETED((short) 15, "deleted"),
+ DELTA((short) 16, "delta");
+
+ private static final Map<String, _Fields> BY_NAME = new HashMap<>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ BY_NAME.put(field.getFieldName(), field);
+ }
+ }
+
+ public static _Fields findByThriftId(int fieldId)
+ {
+ switch (fieldId) {
+ case 1:
+ return TIMESTAMP;
+ case 2:
+ return PAGE;
+ case 3:
+ return LANGUAGE;
+ case 4:
+ return USER;
+ case 5:
+ return UNPATROLLED;
+ case 6:
+ return NEW_PAGE;
+ case 7:
+ return ROBOT;
+ case 8:
+ return ANONYMOUS;
+ case 9:
+ return NAMESPACE;
+ case 10:
+ return CONTINENT;
+ case 11:
+ return COUNTRY;
+ case 12:
+ return REGION;
+ case 13:
+ return CITY;
+ case 14:
+ return ADDED;
+ case 15:
+ return DELETED;
+ case 16:
+ return DELTA;
+ default:
+ return null;
+ }
+ }
+
+ public static _Fields findByName(String name)
+ {
+ return BY_NAME.get(name);
+ }
+
+ private final short thriftId;
+ private final String fieldName;
+
+ _Fields(short thriftId, String fieldName)
+ {
+ this.thriftId = thriftId;
+ this.fieldName = fieldName;
+ }
+
+ @Override
+ public short getThriftFieldId()
+ {
+ return thriftId;
+ }
+
+ @Override
+ public String getFieldName()
+ {
+ return fieldName;
+ }
+ }
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData>
META_DATA_MAP;
+
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new
EnumMap<>(_Fields.class);
+ for (_Fields field : _Fields.values()) {
+ tmpMap.put(field, new org.apache.thrift.meta_data.FieldMetaData(
+ field.getFieldName(),
+ org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(TType.STRING)
+ ));
+ }
+ META_DATA_MAP = java.util.Collections.unmodifiableMap(tmpMap);
+
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(WikipediaThriftEvent.class,
META_DATA_MAP);
+ }
+
+ public WikipediaThriftEvent()
+ {
+ }
+
+ public WikipediaThriftEvent(WikipediaThriftEvent other)
+ {
+ this.timestamp = other.timestamp;
+ this.page = other.page;
+ this.language = other.language;
+ this.user = other.user;
+ this.unpatrolled = other.unpatrolled;
+ this.newPage = other.newPage;
+ this.robot = other.robot;
+ this.anonymous = other.anonymous;
+ this.namespace = other.namespace;
+ this.continent = other.continent;
+ this.country = other.country;
+ this.region = other.region;
+ this.city = other.city;
+ this.added = other.added;
+ this.deleted = other.deleted;
+ this.delta = other.delta;
+ }
+
+ @Override
+ public WikipediaThriftEvent deepCopy()
+ {
+ return new WikipediaThriftEvent(this);
+ }
+
+ @Override
+ public void clear()
+ {
+ timestamp = null;
+ page = null;
+ language = null;
+ user = null;
+ unpatrolled = null;
+ newPage = null;
+ robot = null;
+ anonymous = null;
+ namespace = null;
+ continent = null;
+ country = null;
+ region = null;
+ city = null;
+ added = null;
+ deleted = null;
+ delta = null;
+ }
+
+ @Override
+ public _Fields fieldForId(int fieldId)
+ {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ @Override
+ public boolean isSet(_Fields field)
+ {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+ return getFieldValue(field) != null;
+ }
+
+ @Override
+ public Object getFieldValue(_Fields field)
+ {
+ switch (field) {
+ case TIMESTAMP:
+ return timestamp;
+ case PAGE:
+ return page;
+ case LANGUAGE:
+ return language;
+ case USER:
+ return user;
+ case UNPATROLLED:
+ return unpatrolled;
+ case NEW_PAGE:
+ return newPage;
+ case ROBOT:
+ return robot;
+ case ANONYMOUS:
+ return anonymous;
+ case NAMESPACE:
+ return namespace;
+ case CONTINENT:
+ return continent;
+ case COUNTRY:
+ return country;
+ case REGION:
+ return region;
+ case CITY:
+ return city;
+ case ADDED:
+ return added;
+ case DELETED:
+ return deleted;
+ case DELTA:
+ return delta;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ @Override
+ public void setFieldValue(_Fields field, Object value)
+ {
+ switch (field) {
+ case TIMESTAMP:
+ timestamp = (String) value;
+ break;
+ case PAGE:
+ page = (String) value;
+ break;
+ case LANGUAGE:
+ language = (String) value;
+ break;
+ case USER:
+ user = (String) value;
+ break;
+ case UNPATROLLED:
+ unpatrolled = (String) value;
+ break;
+ case NEW_PAGE:
+ newPage = (String) value;
+ break;
+ case ROBOT:
+ robot = (String) value;
+ break;
+ case ANONYMOUS:
+ anonymous = (String) value;
+ break;
+ case NAMESPACE:
+ namespace = (String) value;
+ break;
+ case CONTINENT:
+ continent = (String) value;
+ break;
+ case COUNTRY:
+ country = (String) value;
+ break;
+ case REGION:
+ region = (String) value;
+ break;
+ case CITY:
+ city = (String) value;
+ break;
+ case ADDED:
+ added = (String) value;
+ break;
+ case DELETED:
+ deleted = (String) value;
+ break;
+ case DELTA:
+ delta = (String) value;
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ @Override
+ public void read(TProtocol iprot) throws TException
+ {
+ TField field;
+ iprot.readStructBegin();
+ while (true) {
+ field = iprot.readFieldBegin();
+ if (field.type == TType.STOP) {
+ break;
+ }
+ if (field.type == TType.STRING) {
+ switch (field.id) {
+ case 1:
+ timestamp = iprot.readString();
+ break;
+ case 2:
+ page = iprot.readString();
+ break;
+ case 3:
+ language = iprot.readString();
+ break;
+ case 4:
+ user = iprot.readString();
+ break;
+ case 5:
+ unpatrolled = iprot.readString();
+ break;
+ case 6:
+ newPage = iprot.readString();
+ break;
+ case 7:
+ robot = iprot.readString();
+ break;
+ case 8:
+ anonymous = iprot.readString();
+ break;
+ case 9:
+ namespace = iprot.readString();
+ break;
+ case 10:
+ continent = iprot.readString();
+ break;
+ case 11:
+ country = iprot.readString();
+ break;
+ case 12:
+ region = iprot.readString();
+ break;
+ case 13:
+ city = iprot.readString();
+ break;
+ case 14:
+ added = iprot.readString();
+ break;
+ case 15:
+ deleted = iprot.readString();
+ break;
+ case 16:
+ delta = iprot.readString();
+ break;
+ default:
+ TProtocolUtil.skip(iprot, field.type);
+ break;
+ }
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ validate();
+ }
+
+ @Override
+ public void write(TProtocol oprot) throws TException
+ {
+ validate();
+ oprot.writeStructBegin(STRUCT_DESC);
+ writeStringField(oprot, TIMESTAMP_FIELD_DESC, timestamp);
+ writeStringField(oprot, PAGE_FIELD_DESC, page);
+ writeStringField(oprot, LANGUAGE_FIELD_DESC, language);
+ writeStringField(oprot, USER_FIELD_DESC, user);
+ writeStringField(oprot, UNPATROLLED_FIELD_DESC, unpatrolled);
+ writeStringField(oprot, NEW_PAGE_FIELD_DESC, newPage);
+ writeStringField(oprot, ROBOT_FIELD_DESC, robot);
+ writeStringField(oprot, ANONYMOUS_FIELD_DESC, anonymous);
+ writeStringField(oprot, NAMESPACE_FIELD_DESC, namespace);
+ writeStringField(oprot, CONTINENT_FIELD_DESC, continent);
+ writeStringField(oprot, COUNTRY_FIELD_DESC, country);
+ writeStringField(oprot, REGION_FIELD_DESC, region);
+ writeStringField(oprot, CITY_FIELD_DESC, city);
+ writeStringField(oprot, ADDED_FIELD_DESC, added);
+ writeStringField(oprot, DELETED_FIELD_DESC, deleted);
+ writeStringField(oprot, DELTA_FIELD_DESC, delta);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ private static void writeStringField(TProtocol oprot, TField desc, String
value) throws TException
+ {
+ if (value != null) {
+ oprot.writeFieldBegin(desc);
+ oprot.writeString(value);
+ oprot.writeFieldEnd();
+ }
+ }
+
+ public void validate() throws TException
+ {
+ // no required fields
+ }
+
+ @Override
+ public int compareTo(WikipediaThriftEvent other)
+ {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+ return Objects.compare(timestamp, other.timestamp,
java.util.Comparator.nullsFirst(java.util.Comparator.naturalOrder()));
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WikipediaThriftEvent that = (WikipediaThriftEvent) o;
+ return Objects.equals(timestamp, that.timestamp)
+ && Objects.equals(page, that.page)
+ && Objects.equals(language, that.language)
+ && Objects.equals(user, that.user)
+ && Objects.equals(namespace, that.namespace);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(timestamp, page, language, user, namespace);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "WikipediaThriftEvent{timestamp=" + timestamp + ", page=" + page +
"}";
+ }
+}
diff --git a/extensions-contrib/thrift-extensions/pom.xml
b/extensions-contrib/thrift-extensions/pom.xml
index e212ddf4907..9e39c0b5d8b 100644
--- a/extensions-contrib/thrift-extensions/pom.xml
+++ b/extensions-contrib/thrift-extensions/pom.xml
@@ -115,6 +115,16 @@
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>scrooge-core_2.11</artifactId>
@@ -136,6 +146,11 @@
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>nl.jqno.equalsverifier</groupId>
+ <artifactId>equalsverifier</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
<profile>
diff --git
a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java
b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java
index bea2a913e39..4ffba3a7e24 100644
---
a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java
+++
b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftExtensionsModule.java
@@ -37,7 +37,8 @@ public class ThriftExtensionsModule implements DruidModule
return Collections.singletonList(
new SimpleModule("ThriftInputRowParserModule")
.registerSubtypes(
- new NamedType(ThriftInputRowParser.class, "thrift")
+ new NamedType(ThriftInputRowParser.class, "thrift"),
+ new NamedType(ThriftInputFormat.class, "thrift")
)
);
}
diff --git
a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java
b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java
new file mode 100644
index 00000000000..fdf638fa7e3
--- /dev/null
+++
b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputFormat.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.thrift;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.NestedInputFormat;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.Objects;
+
+/**
+ * {@link org.apache.druid.data.input.InputFormat} for Thrift-encoded data.
Supports binary, compact, and JSON
+ * Thrift protocols (with optional Base64 encoding).
+ * <p>
+ * The thrift class can be provided either from the classpath or from an
external jar file via {@code thriftJar}.
+ */
+public class ThriftInputFormat extends NestedInputFormat
+{
+ private final String thriftJar;
+ private final String thriftClass;
+
+ @JsonCreator
+ public ThriftInputFormat(
+ @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
+ @JsonProperty("thriftJar") @Nullable String thriftJar,
+ @JsonProperty("thriftClass") String thriftClass
+ )
+ {
+ super(flattenSpec);
+ this.thriftJar = thriftJar;
+ InvalidInput.conditionalException(thriftClass != null, "thriftClass must
not be null");
+ this.thriftClass = thriftClass;
+ }
+
+ @Nullable
+ @JsonProperty("thriftJar")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public String getThriftJar()
+ {
+ return thriftJar;
+ }
+
+ @JsonProperty("thriftClass")
+ public String getThriftClass()
+ {
+ return thriftClass;
+ }
+
+ @Override
+ public boolean isSplittable()
+ {
+ return false;
+ }
+
+ @Override
+ public InputEntityReader createReader(InputRowSchema inputRowSchema,
InputEntity source, File temporaryDirectory)
+ {
+ return new ThriftReader(inputRowSchema, source, thriftJar, thriftClass,
getFlattenSpec());
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ ThriftInputFormat that = (ThriftInputFormat) o;
+ return Objects.equals(thriftJar, that.thriftJar) &&
+ Objects.equals(thriftClass, that.thriftClass);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), thriftJar, thriftClass);
+ }
+}
diff --git
a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java
b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java
new file mode 100644
index 00000000000..d4b3c0bdc82
--- /dev/null
+++
b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftReader.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.thrift;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterators;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ObjectFlattener;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class ThriftReader extends IntermediateRowParsingReader<byte[]>
+{
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private final InputRowSchema inputRowSchema;
+ private final InputEntity source;
+ private final String jarPath;
+ private final String thriftClassName;
+ private final ObjectFlattener<JsonNode> recordFlattener;
+
+ private volatile Class<TBase> thriftClass = null;
+
+ ThriftReader(
+ InputRowSchema inputRowSchema,
+ InputEntity source,
+ @Nullable String jarPath,
+ String thriftClassName,
+ @Nullable JSONPathSpec flattenSpec
+ )
+ {
+ this.inputRowSchema = inputRowSchema;
+ this.source = source;
+ this.jarPath = jarPath;
+ this.thriftClassName = thriftClassName;
+ this.recordFlattener = ObjectFlatteners.create(
+ flattenSpec,
+ new JSONFlattenerMaker(false,
inputRowSchema.getDimensionsSpec().useSchemaDiscovery())
+ );
+ }
+
+ @Override
+ protected CloseableIterator<byte[]> intermediateRowIterator() throws
IOException
+ {
+ return CloseableIterators.withEmptyBaggage(
+ Iterators.singletonIterator(IOUtils.toByteArray(source.open()))
+ );
+ }
+
+ @Override
+ protected InputEntity source()
+ {
+ return source;
+ }
+
+ @Override
+ protected List<InputRow> parseInputRows(byte[] intermediateRow) throws
IOException, ParseException
+ {
+ return Collections.singletonList(MapInputRowParser.parse(inputRowSchema,
toFlattenedMap(intermediateRow)));
+ }
+
+ @Override
+ protected List<Map<String, Object>> toMap(byte[] intermediateRow) throws
IOException
+ {
+ return Collections.singletonList(toFlattenedMap(intermediateRow));
+ }
+
+ private Map<String, Object> toFlattenedMap(byte[] bytes) throws
ParseException
+ {
+ try {
+ final Class<TBase> clazz = getThriftClass();
+ final TBase tbase = clazz.newInstance();
+ ThriftDeserialization.detectAndDeserialize(bytes, tbase);
+ final String json =
ThriftDeserialization.SERIALIZER_SIMPLE_JSON.get().toString(tbase);
+ final JsonNode node = OBJECT_MAPPER.readTree(json);
+ return recordFlattener.flatten(node);
+ }
+ catch (TException | IOException e) {
+ throw new ParseException(null, e, "Failed to deserialize Thrift data");
+ }
+ catch (InstantiationException | IllegalAccessException e) {
+ throw new ParseException(null, e, "Failed to instantiate Thrift class
[%s]", thriftClassName);
+ }
+ catch (ClassNotFoundException e) {
+ throw new ParseException(null, e, "Thrift class [%s] not found",
thriftClassName);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "ReturnValueIgnored"})
+ private Class<TBase> getThriftClass() throws IOException,
ClassNotFoundException, InstantiationException, IllegalAccessException
+ {
+ if (thriftClass == null) {
+ final Class<TBase> clazz;
+ if (jarPath != null) {
+ File jar = new File(jarPath);
+ URLClassLoader child = new URLClassLoader(
+ new URL[]{jar.toURI().toURL()},
+ this.getClass().getClassLoader()
+ );
+ clazz = (Class<TBase>) Class.forName(thriftClassName, true, child);
+ } else {
+ clazz = (Class<TBase>) Class.forName(thriftClassName);
+ }
+ // Verify the class can be instantiated
+ clazz.newInstance();
+ thriftClass = clazz;
+ }
+ return thriftClass;
+ }
+}
diff --git
a/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputFormatTest.java
b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputFormatTest.java
new file mode 100644
index 00000000000..79a5d36a3c9
--- /dev/null
+++
b/extensions-contrib/thrift-extensions/src/test/java/org/apache/druid/data/input/thrift/ThriftInputFormatTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.thrift;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class ThriftInputFormatTest
+{
+ private static final String THRIFT_CLASS =
"org.apache.druid.data.input.thrift.Book";
+
+ private Book book;
+ private InputRowSchema schema;
+ private JSONPathSpec flattenSpec;
+
+ @Before
+ public void setUp()
+ {
+ book = new Book()
+ .setDate("2016-08-29")
+ .setPrice(19.9)
+ .setTitle("title")
+ .setAuthor(new Author().setFirstName("first").setLastName("last"));
+
+ schema = new InputRowSchema(
+ new TimestampSpec("date", "auto", null),
+ new DimensionsSpec(Lists.newArrayList(
+ new StringDimensionSchema("title"),
+ new StringDimensionSchema("lastName")
+ )),
+ ColumnsFilter.all()
+ );
+
+ flattenSpec = new JSONPathSpec(
+ true,
+ Lists.newArrayList(
+ new JSONPathFieldSpec(JSONPathFieldType.ROOT, "title", "title"),
+ new JSONPathFieldSpec(JSONPathFieldType.PATH, "lastName",
"$.author.lastName")
+ )
+ );
+ }
+
+ @Test
+ public void testParseCompact() throws Exception
+ {
+ TSerializer serializer = new TSerializer(new TCompactProtocol.Factory());
+ byte[] bytes = serializer.serialize(book);
+ assertParsedRow(bytes);
+ }
+
+ @Test
+ public void testParseBinaryBase64() throws Exception
+ {
+ TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
+ byte[] bytes = StringUtils.encodeBase64(serializer.serialize(book));
+ assertParsedRow(bytes);
+ }
+
+ @Test
+ public void testParseJson() throws Exception
+ {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ byte[] bytes = serializer.serialize(book);
+ assertParsedRow(bytes);
+ }
+
+ @Test
+ public void testParseWithJarPath() throws Exception
+ {
+ TSerializer serializer = new TSerializer(new TCompactProtocol.Factory());
+ byte[] bytes = serializer.serialize(book);
+
+ ThriftInputFormat format = new ThriftInputFormat(flattenSpec,
"example/book.jar", THRIFT_CLASS);
+ InputRow row = readSingleRow(format, bytes);
+ Assert.assertEquals("title", row.getDimension("title").get(0));
+ Assert.assertEquals("last", row.getDimension("lastName").get(0));
+ }
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ for (Module module : new ThriftExtensionsModule().getJacksonModules()) {
+ mapper.registerModule(module);
+ }
+
+ ThriftInputFormat format = new ThriftInputFormat(flattenSpec, null,
THRIFT_CLASS);
+ String json = mapper.writeValueAsString(format);
+ ThriftInputFormat deserialized = (ThriftInputFormat)
mapper.readValue(json, org.apache.druid.data.input.InputFormat.class);
+
+ Assert.assertEquals(format, deserialized);
+ }
+
+ @Test
+ public void testIsSplittable()
+ {
+ ThriftInputFormat format = new ThriftInputFormat(null, null, THRIFT_CLASS);
+ Assert.assertFalse(format.isSplittable());
+ }
+
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(ThriftInputFormat.class).usingGetClass().verify();
+ }
+
+ private void assertParsedRow(byte[] bytes) throws IOException
+ {
+ ThriftInputFormat format = new ThriftInputFormat(flattenSpec, null,
THRIFT_CLASS);
+ InputRow row = readSingleRow(format, bytes);
+ Assert.assertEquals("title", row.getDimension("title").get(0));
+ Assert.assertEquals("last", row.getDimension("lastName").get(0));
+ }
+
+ private InputRow readSingleRow(ThriftInputFormat format, byte[] bytes)
throws IOException
+ {
+ InputEntityReader reader = format.createReader(schema, new
ByteEntity(bytes), null);
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ Assert.assertTrue(iterator.hasNext());
+ InputRow row = iterator.next();
+ Assert.assertFalse(iterator.hasNext());
+ return row;
+ }
+ }
+}
diff --git a/website/.spelling b/website/.spelling
index 86b6c0b6f7d..cc00e32f1a4 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -563,6 +563,7 @@ stdout
storages
stringDictionaryEncoding
stringified
+structs
sub-conditions
subarray
subnet
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]