Hi,

IngestSegmentFirehose is a firehose to read data from existing Druid
dataSource. Since its format is always fixed (Druid's internal segment
format), it doesn't need a parser, but needs only transformSpec to
transform data properly during reingestion. Would you tell me why you want
to specify the parser with IngestSegmentFirehose?

Jihoon

On Mon, Jun 24, 2019 at 10:36 AM Косов Максим Владимирович <mko...@cbgr.ru>
wrote:

> Hello,
> I've got interested in Apache Druid and decided to study it. I've decided
> to complete an example of sending data from one data source to another. For
> this goal, I'm using ingestSegment Firehose. In the parser description, I'm
> adding flattenSpec (the parser type is string, the format is json).
> Here goes the configuration:
> {
>   "type" : "index",
>   "spec" : {
>     "dataSchema" : {
>       "dataSource" : "cp37-data8",
>       "parser" : {
>         "type" : "string",
>         "parseSpec" : {
>           "format" : "json",
>           "timestampSpec" : {
>             "column" : "__time",
>             "format" : "auto"
>           },
>           "flattenSpec": {
>             "useFieldDiscovery": true,
>             "fields": [
>               {
>                 "type": "jq",
>                 "name": "resourceItemStatusDetails_updateDateTime",
>                 "expr": ".fullDocument_data |
> fromjson.resourceItemStatusDetails.updateDateTime.\"$date\""
>               }
>             ]
>           },
>           "dimensionsSpec" : {
>             "dimensions": [
>               "operationType",
>               "databaseName",
>               "collectionName",
>               "fullDocument_id",
>               "fullDocument_docId",
>               "resourceItemStatusDetails_updateDateTime",
>               {
>                 "type": "long",
>                 "name": "clusterTime"
>               }
>             ],
>             "dimensionExclusions" : [
>
>             ],
>             "spatialDimensions" : []
>           }
>         }
>       },
>       "metricsSpec" : [
>         {
>           "type" : "count",
>           "name" : "count"
>         }
>       ],
>       "granularitySpec" : {
>         "type" : "uniform",
>         "segmentGranularity" : "DAY",
>         "queryGranularity" : "NONE"
>       }
>     },
>     "ioConfig" : {
>       "type" : "index",
>       "firehose" : {
>         "type" : "ingestSegment",
>         "dataSource" : "cp-all-buffer",
>         "interval" : "2018-01-01/2020-01-03"
>       },
>       "appendToExisting" : false
>     },
>     "tuningConfig" : {
>       "type" : "index",
>       "maxRowsPerSegment" : 100000,
>       "maxRowsInMemory" : 1000
>     }
>   }
> }
>
> The task itself is executed successfully, but settings which I set up in
> the parser are being ignored during the execution. I've taken a look at the
> source code for Druid, and it seems that I have found a bug.
> If you'll take a look at the IngestSegmentFirehoseFactory class, you'll
> see that we pass only TransformSpec (which we got from the parser) to the
> IngestSegmentFirehose constructor, but not the parser itself.
> final TransformSpec transformSpec =
> TransformSpec.fromInputRowParser(inputRowParser);
> return new IngestSegmentFirehose(adapters, transformSpec, dims,
> metricsList, dimFilter);
>
> Next, in IngestSegmentFirehose we're creating a transformer and perform a
> transformation.
>
> final InputRow inputRow = rowYielder.get();
>
>
> rowYielder = rowYielder.next(null);
>
>
> return transformer.transform(inputRow);
>
> During this stage, we have already lost call of the method parse on the
> parser, which explains the fact that in my example parser settings were
> ignored.
> It raises the question, why don't we just pass the parser itself to the
> IngestSegmentFirehose constructor? If we'll take a look at the
> TransformSpec.fromInputRowParser method implementation, we'll see that
> there's always either a decorator with a transformer or error, so in the
> implementations of such parsers in methods parse transformer always being
> called additionally.
>
>
> parser.parseBatch(row).stream().map(transformer::transform).collect(Collectors.toList());
>
> Could please anyone clarify if this is intentional behaviour or a bug?
> Thanks!
>
>

Reply via email to