Custom Inputrowparser Example: >i created jar under >/resources/META-INF/services/io.druid.initialization.DruidModule >uploaded jar under druid/extensions/druid-reporting-transformer >added custom extension to druid >conf-quickstart/druid/_common/common.runtime.properties
>druid.extensions.loadList=["druid-reporting-transformer"] This is my customInputrowparser: public class ExampleInputRowParser implements ByteBufferInputRowParser { public static final String TYPE_NAME = "exampleParser"; private final ParseSpec parseSpec; private StringInputRowParser stringParser; @JsonCreator public ExampleInputRowParser(@JsonProperty("parseSpec") ParseSpec parseSpec) { this.parseSpec = parseSpec; } @Override public ParseSpec getParseSpec() { return parseSpec; } @Override public List<InputRow> parseBatch(ByteBuffer input) { if (stringParser == null) { stringParser = new StringInputRowParser(parseSpec); } String objstring = StringUtils.fromUtf8(input); try { Map requestMap = (Map) JsonUtil.stringToObj(objstring, Map.class); populateTeam(requestMap); String stringInput=JsonUtil.deepString(requestMap); return ImmutableList.of(stringParser.parse(stringInput)); } catch (IOException e) { e.printStackTrace(); } return null; } @Override public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec) { return new ExampleInputRowParser(parseSpec); } public void populateTeam(Map requestMap) { Map<String, Object> payload = (Map<String, Object>) requestMap.get("payload"); Map<String, Object> dataMap = (Map<String, Object>) payload.get("outboundMsg"); //String tenant = (String) payload.get("tenant"); Object objValue = dataMap.get("taskEntities"); if (objValue != null && objValue instanceof List) { List<Map> innerList = (List<Map>) objValue; if (innerList.size() > 0) { objValue = innerList.get(0).get("taskInstanceEntities"); if (objValue != null && objValue instanceof List) { Map<String, Object> taskMap = (Map) ((List) objValue).get(0); //objValue = taskMap.get("assignedToUserId"); //String userId = null; //if (objValue != null) { taskMap.put("teamId", "userteamid"); taskMap.put("teamName", "userteamName"); taskMap.put("parent", "userteamtead"); //} } } } } } And restarted all druid nodes: [historical,broker,overload,coordinator,middlemanager] create topic in kafka: ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Reporting_Transformer Create InputSpec in druid under quickstart reporting-transformerspec.json: { "dataSchema": { "dataSource": "Reporting_Transformer", "parser": { "type": "reportingParser", "parseSpec": { "format": "json", "flattenSpec": { "useFieldDiscovery": true, "fields": [ { "type": "path", "name": "task_name", "expr": "$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].taskName" }, { "type": "path", "name": "team_name", "expr": "$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].teamName" }, { "type": "path", "name": "team_id", "expr": "$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].teamId" }, { "type": "path", "name": "parent", "expr": "$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].parent" }, { "type": "path", "name": "created_date", "expr": "$.payload.outboundMsg.taskEntities[0].taskInstanceEntities[0].createdDate" } ] }, "timestampSpec": { "column": "created_date", "format": "auto" }, "dimensionsSpec": { "dimensions": ["task_name", "team_id", "team_name", "parent",{"name": "created_date", "type":"long"}] } } }, "metricsSpec" : [], "transformSpec": { "transforms": [] }, "granularitySpec": { "type": "uniform", "segmentGranularity": "HOUR", "queryGranularity": "NONE", "rollup": true } }, "ioConfig": { "topic": "Reporting_Transformer", "consumerProperties": { "bootstrap.servers": "localhost:9092" } > submit spec to supervisor: > curl -XPOST -H'Content-Type: application/json' -d > @quickstart/reporting-transformerspec.json > http://localhost:8090/druid/indexer/v1/supervisor > send jsondata from kafka to druid: > ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > Reporting_Transformer {"payload":{"outboundMsg":{"taskEntities": [ {"taskInstanceEntities":[ {"taskName":"process123","created_date":1566222246780}] } ]} }} when i submitted the query spec it is showing empty. --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org