Custom Inputrowparser Example:
>i created jar under 
>/resources/META-INF/services/io.druid.initialization.DruidModule
>uploaded jar under druid/extensions/druid-reporting-transformer
>added custom extension to druid  
>conf-quickstart/druid/_common/common.runtime.properties

>druid.extensions.loadList=["druid-reporting-transformer"]
 
This is my customInputrowparser:
public class ExampleInputRowParser implements ByteBufferInputRowParser {
        public static final String TYPE_NAME = "exampleParser";
        private final ParseSpec parseSpec;
          private StringInputRowParser stringParser;

           @JsonCreator
          public ExampleInputRowParser(@JsonProperty("parseSpec") ParseSpec 
parseSpec)
          {
            this.parseSpec = parseSpec;
          }

        @Override
        public ParseSpec getParseSpec() {
                
                return parseSpec;
        }

        @Override
          public List<InputRow> parseBatch(ByteBuffer input)
          {
                if (stringParser == null) {
                      stringParser = new StringInputRowParser(parseSpec);
                    }
            String objstring = StringUtils.fromUtf8(input);
            try {
                Map requestMap = (Map) JsonUtil.stringToObj(objstring, 
Map.class);
            populateTeam(requestMap);
            String stringInput=JsonUtil.deepString(requestMap);
            return ImmutableList.of(stringParser.parse(stringInput));

                } catch (IOException e) {
                        e.printStackTrace();
                }
                return null;

          }

        @Override
        public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec) {
                return new ExampleInputRowParser(parseSpec);
        }
        
        public void populateTeam(Map requestMap) {
                Map<String, Object> payload = (Map<String, Object>) 
requestMap.get("payload");
            Map<String, Object> dataMap = (Map<String, Object>) 
payload.get("outboundMsg");
                //String tenant = (String) payload.get("tenant");

            Object objValue = dataMap.get("taskEntities");

                        if (objValue != null && objValue instanceof List) {
                                List<Map> innerList = (List<Map>) objValue;
                                if (innerList.size() > 0) {
                                        objValue = 
innerList.get(0).get("taskInstanceEntities");
                                        if (objValue != null && objValue 
instanceof List) {
                                                Map<String, Object> taskMap = 
(Map) ((List) objValue).get(0);
                                                //objValue = 
taskMap.get("assignedToUserId");
                                                //String userId = null;
                                                //if (objValue != null) {
                                                            
taskMap.put("teamId", "userteamid");
                                                            
taskMap.put("teamName", "userteamName");
                                                                
taskMap.put("parent", "userteamtead");
                                                //}
                       }
                                }
                        }
        }
}

And restarted all druid nodes: 
[historical,broker,overload,coordinator,middlemanager]

create topic in kafka:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 
1 --partitions 1 --topic Reporting_Transformer

 Create InputSpec in druid under quickstart
 reporting-transformerspec.json:
 
  {
  "dataSchema": {
    "dataSource": "Reporting_Transformer",
    "parser": {
      "type": "reportingParser",
      "parseSpec": {
        "format": "json",
        "flattenSpec": {
          "useFieldDiscovery": true,
          "fields": [

            {
              "type": "path",
              "name": "task_name",
              "expr": 
"$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].taskName"
            },


{
              "type": "path",
              "name": "team_name",
              "expr": 
"$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].teamName"
            },

            {
              "type": "path",
              "name": "team_id",
              "expr": 
"$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].teamId"
            },
            {
              "type": "path",
              "name": "parent",
              "expr": 
"$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].parent"
            },
         {
              "type": "path",
              "name": "created_date",
              "expr": 
"$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].createdDate"
            }
      ]
        },
        "timestampSpec": {
          "column": "created_date",
          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions": ["task_name", "team_id", "team_name", "parent",{"name": 
"created_date", "type":"long"}]
        }
      }
    },
    "metricsSpec" : [],
    "transformSpec": {
      "transforms": []
    },
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "HOUR",
      "queryGranularity": "NONE",
      "rollup": true
    }
  },
  "ioConfig": {
    "topic": "Reporting_Transformer",
    "consumerProperties": {
      "bootstrap.servers": "localhost:9092"
    }

> submit spec to supervisor:

> curl -XPOST -H'Content-Type: application/json' -d 
> @quickstart/reporting-transformerspec.json 
> http://localhost:8090/druid/indexer/v1/supervisor

> send jsondata from kafka to druid:
> ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
> Reporting_Transformer 

{"payload":{"outboundMsg":{"taskEntities": [
    {"taskInstanceEntities":[ 
{"taskName":"process123","created_date":1566222246780}] }
    
  ]}
}}

when i submitted the query spec it is showing empty.




---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org

Reply via email to