Repository: incubator-airflow Updated Branches: refs/heads/master e6cc1c055 -> 0e568602f
[AIRFLOW-475] make the segment granularity in Druid hook configurable The Druid hook now has hardcoded `segmentGranularity` - "DAY", we need it configurable for different use cases. mistercrunch aoen plypaul Closes #1771 from hongbozeng/hongbo/segment_granularity Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0e568602 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0e568602 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0e568602 Branch: refs/heads/master Commit: 0e568602f869f20a3cd424ed11961c275468ad1e Parents: e6cc1c0 Author: Hongbo Zeng <hongbo.z...@airbnb.com> Authored: Wed Aug 31 15:56:00 2016 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Wed Aug 31 15:59:18 2016 -0700 ---------------------------------------------------------------------- airflow/hooks/druid_hook.py | 20 +++++++++++++------- airflow/operators/hive_to_druid.py | 5 +++++ 2 files changed, 18 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e568602/airflow/hooks/druid_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index 61d3c40..9f0a022 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -67,7 +67,9 @@ class DruidHook(BaseHook): def construct_ingest_query( self, datasource, static_path, ts_dim, columns, metric_spec, - intervals, num_shards, target_partition_size, hadoop_dependency_coordinates=None): + intervals, num_shards, target_partition_size, + query_granularity="NONE", segment_granularity="DAY", + hadoop_dependency_coordinates=None): """ Builds an ingest query for an HDFS TSV load. @@ -92,10 +94,10 @@ class DruidHook(BaseHook): "dataSchema": { "metricsSpec": metric_spec, "granularitySpec": { - "queryGranularity": "NONE", + "queryGranularity": query_granularity, "intervals": intervals, "type": "uniform", - "segmentGranularity": "DAY", + "segmentGranularity": segment_granularity, }, "parser": { "type": "string", @@ -145,10 +147,12 @@ class DruidHook(BaseHook): def send_ingest_query( self, datasource, static_path, ts_dim, columns, metric_spec, - intervals, num_shards, target_partition_size, hadoop_dependency_coordinates=None): + intervals, num_shards, target_partition_size, query_granularity, segment_granularity, + hadoop_dependency_coordinates=None): query = self.construct_ingest_query( datasource, static_path, ts_dim, columns, - metric_spec, intervals, num_shards, target_partition_size, hadoop_dependency_coordinates) + metric_spec, intervals, num_shards, target_partition_size, + query_granularity, segment_granularity, hadoop_dependency_coordinates) r = requests.post( self.ingest_post_url, headers=self.header, data=query) logging.info(self.ingest_post_url) @@ -162,7 +166,8 @@ class DruidHook(BaseHook): def load_from_hdfs( self, datasource, static_path, ts_dim, columns, - intervals, num_shards, target_partition_size, metric_spec=None, hadoop_dependency_coordinates=None): + intervals, num_shards, target_partition_size, query_granularity, segment_granularity, + metric_spec=None, hadoop_dependency_coordinates=None): """ load data to druid from hdfs @@ -171,7 +176,8 @@ class DruidHook(BaseHook): """ task_id = self.send_ingest_query( datasource, static_path, ts_dim, columns, metric_spec, - intervals, num_shards, target_partition_size, hadoop_dependency_coordinates) + intervals, num_shards, target_partition_size, query_granularity, segment_granularity, + hadoop_dependency_coordinates) status_url = self.get_ingest_status_url(task_id) while True: r = requests.get(status_url) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e568602/airflow/operators/hive_to_druid.py ---------------------------------------------------------------------- diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py index 5ed5145..17b8188 100644 --- a/airflow/operators/hive_to_druid.py +++ b/airflow/operators/hive_to_druid.py @@ -66,6 +66,8 @@ class HiveToDruidTransfer(BaseOperator): intervals=None, num_shards=-1, target_partition_size=-1, + query_granularity=None, + segment_granularity=None, *args, **kwargs): super(HiveToDruidTransfer, self).__init__(*args, **kwargs) self.sql = sql @@ -74,6 +76,8 @@ class HiveToDruidTransfer(BaseOperator): self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}'] self.num_shards = num_shards self.target_partition_size = target_partition_size + self.query_granularity = query_granularity + self.segment_granularity = segment_granularity self.metric_spec = metric_spec or [{ "name": "count", "type": "count"}] @@ -122,6 +126,7 @@ class HiveToDruidTransfer(BaseOperator): intervals=self.intervals, static_path=static_path, ts_dim=self.ts_dim, columns=columns, num_shards=self.num_shards, target_partition_size=self.target_partition_size, + query_granularity=self.query_granularity, segment_granularity=self.segment_granularity, metric_spec=self.metric_spec, hadoop_dependency_coordinates=self.hadoop_dependency_coordinates) logging.info("Load seems to have succeeded!") finally: