Repository: incubator-airflow
Updated Branches:
  refs/heads/master e6cc1c055 -> 0e568602f


[AIRFLOW-475] make the segment granularity in Druid hook configurable

The Druid hook now has hardcoded
`segmentGranularity` - "DAY", we need it
configurable for different use cases.

mistercrunch aoen plypaul

Closes #1771 from
hongbozeng/hongbo/segment_granularity


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0e568602
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0e568602
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0e568602

Branch: refs/heads/master
Commit: 0e568602f869f20a3cd424ed11961c275468ad1e
Parents: e6cc1c0
Author: Hongbo Zeng <hongbo.z...@airbnb.com>
Authored: Wed Aug 31 15:56:00 2016 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Wed Aug 31 15:59:18 2016 -0700

----------------------------------------------------------------------
 airflow/hooks/druid_hook.py        | 20 +++++++++++++-------
 airflow/operators/hive_to_druid.py |  5 +++++
 2 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e568602/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index 61d3c40..9f0a022 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -67,7 +67,9 @@ class DruidHook(BaseHook):
 
     def construct_ingest_query(
             self, datasource, static_path, ts_dim, columns, metric_spec,
-            intervals, num_shards, target_partition_size, 
hadoop_dependency_coordinates=None):
+            intervals, num_shards, target_partition_size,
+            query_granularity="NONE", segment_granularity="DAY",
+            hadoop_dependency_coordinates=None):
         """
         Builds an ingest query for an HDFS TSV load.
 
@@ -92,10 +94,10 @@ class DruidHook(BaseHook):
                 "dataSchema": {
                     "metricsSpec": metric_spec,
                     "granularitySpec": {
-                        "queryGranularity": "NONE",
+                        "queryGranularity": query_granularity,
                         "intervals": intervals,
                         "type": "uniform",
-                        "segmentGranularity": "DAY",
+                        "segmentGranularity": segment_granularity,
                     },
                     "parser": {
                         "type": "string",
@@ -145,10 +147,12 @@ class DruidHook(BaseHook):
 
     def send_ingest_query(
             self, datasource, static_path, ts_dim, columns, metric_spec,
-            intervals, num_shards, target_partition_size, 
hadoop_dependency_coordinates=None):
+            intervals, num_shards, target_partition_size, query_granularity, 
segment_granularity,
+            hadoop_dependency_coordinates=None):
         query = self.construct_ingest_query(
             datasource, static_path, ts_dim, columns,
-            metric_spec, intervals, num_shards, target_partition_size, 
hadoop_dependency_coordinates)
+            metric_spec, intervals, num_shards, target_partition_size,
+            query_granularity, segment_granularity, 
hadoop_dependency_coordinates)
         r = requests.post(
             self.ingest_post_url, headers=self.header, data=query)
         logging.info(self.ingest_post_url)
@@ -162,7 +166,8 @@ class DruidHook(BaseHook):
 
     def load_from_hdfs(
             self, datasource, static_path,  ts_dim, columns,
-            intervals, num_shards, target_partition_size, metric_spec=None, 
hadoop_dependency_coordinates=None):
+            intervals, num_shards, target_partition_size, query_granularity, 
segment_granularity,
+            metric_spec=None, hadoop_dependency_coordinates=None):
         """
         load data to druid from hdfs
 
@@ -171,7 +176,8 @@ class DruidHook(BaseHook):
         """
         task_id = self.send_ingest_query(
             datasource, static_path, ts_dim, columns, metric_spec,
-            intervals, num_shards, target_partition_size, 
hadoop_dependency_coordinates)
+            intervals, num_shards, target_partition_size, query_granularity, 
segment_granularity,
+            hadoop_dependency_coordinates)
         status_url = self.get_ingest_status_url(task_id)
         while True:
             r = requests.get(status_url)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e568602/airflow/operators/hive_to_druid.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_druid.py 
b/airflow/operators/hive_to_druid.py
index 5ed5145..17b8188 100644
--- a/airflow/operators/hive_to_druid.py
+++ b/airflow/operators/hive_to_druid.py
@@ -66,6 +66,8 @@ class HiveToDruidTransfer(BaseOperator):
             intervals=None,
             num_shards=-1,
             target_partition_size=-1,
+            query_granularity=None,
+            segment_granularity=None,
             *args, **kwargs):
         super(HiveToDruidTransfer, self).__init__(*args, **kwargs)
         self.sql = sql
@@ -74,6 +76,8 @@ class HiveToDruidTransfer(BaseOperator):
         self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}']
         self.num_shards = num_shards
         self.target_partition_size = target_partition_size
+        self.query_granularity = query_granularity
+        self.segment_granularity = segment_granularity
         self.metric_spec = metric_spec or [{
             "name": "count",
             "type": "count"}]
@@ -122,6 +126,7 @@ class HiveToDruidTransfer(BaseOperator):
                 intervals=self.intervals,
                 static_path=static_path, ts_dim=self.ts_dim,
                 columns=columns, num_shards=self.num_shards, 
target_partition_size=self.target_partition_size,
+                query_granularity=self.query_granularity, 
segment_granularity=self.segment_granularity,
                 metric_spec=self.metric_spec, 
hadoop_dependency_coordinates=self.hadoop_dependency_coordinates)
             logging.info("Load seems to have succeeded!")
         finally:

Reply via email to