Hey Dan, Could you please file JIRAs, and put the JIRA name as the prefix to your commits?
Cheers, Chris On Mon, May 23, 2016 at 5:01 PM, <[email protected]> wrote: > Repository: incubator-airflow > Updated Branches: > refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae > > > use targetPartitionSize as the default partition spec > > > Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo > Commit: > http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09 > Tree: > http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09 > Diff: > http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09 > > Branch: refs/heads/airbnb_rb1.7.1_4 > Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda > Parents: 1d0d868 > Author: Hongbo Zeng <[email protected]> > Authored: Sat May 14 17:00:42 2016 -0700 > Committer: Dan Davydov <[email protected]> > Committed: Mon May 23 16:59:52 2016 -0700 > > ---------------------------------------------------------------------- > airflow/hooks/druid_hook.py | 23 ++++++++++++++++------- > airflow/operators/hive_to_druid.py | 8 +++++--- > 2 files changed, 21 insertions(+), 10 deletions(-) > ---------------------------------------------------------------------- > > > > http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py > ---------------------------------------------------------------------- > diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py > index b6cb231..7c80c7c 100644 > --- a/airflow/hooks/druid_hook.py > +++ b/airflow/hooks/druid_hook.py > @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook > from airflow.exceptions import AirflowException > > LOAD_CHECK_INTERVAL = 5 > - > +TARGET_PARTITION_SIZE = 5000000 > > class AirflowDruidLoadException(AirflowException): > pass > @@ -52,13 +52,22 @@ class DruidHook(BaseHook): > > def construct_ingest_query( > self, datasource, static_path, ts_dim, columns, metric_spec, > - intervals, num_shards, hadoop_dependency_coordinates=None): > + intervals, num_shards, target_partition_size, > hadoop_dependency_coordinates=None): > """ > Builds an ingest query for an HDFS TSV load. > > :param datasource: target datasource in druid > :param columns: list of all columns in the TSV, in the right order > """ > + > + # backward compatibilty for num_shards, but target_partition_size > is the default setting > + # and overwrites the num_shards > + if target_partition_size == -1: > + if num_shards == -1: > + target_partition_size = TARGET_PARTITION_SIZE > + else: > + num_shards = -1 > + > metric_names = [ > m['fieldName'] for m in metric_spec if m['type'] != 'count'] > dimensions = [c for c in columns if c not in metric_names and c > != ts_dim] > @@ -100,7 +109,7 @@ class DruidHook(BaseHook): > }, > "partitionsSpec" : { > "type" : "hashed", > - "targetPartitionSize" : -1, > + "targetPartitionSize" : target_partition_size, > "numShards" : num_shards, > }, > }, > @@ -121,10 +130,10 @@ class DruidHook(BaseHook): > > def send_ingest_query( > self, datasource, static_path, ts_dim, columns, metric_spec, > - intervals, num_shards, hadoop_dependency_coordinates=None): > + intervals, num_shards, target_partition_size, > hadoop_dependency_coordinates=None): > query = self.construct_ingest_query( > datasource, static_path, ts_dim, columns, > - metric_spec, intervals, num_shards, > hadoop_dependency_coordinates) > + metric_spec, intervals, num_shards, target_partition_size, > hadoop_dependency_coordinates) > r = requests.post( > self.ingest_post_url, headers=self.header, data=query) > logging.info(self.ingest_post_url) > @@ -138,7 +147,7 @@ class DruidHook(BaseHook): > > def load_from_hdfs( > self, datasource, static_path, ts_dim, columns, > - intervals, num_shards, metric_spec=None, > hadoop_dependency_coordinates=None): > + intervals, num_shards, target_partition_size, > metric_spec=None, hadoop_dependency_coordinates=None): > """ > load data to druid from hdfs > :params ts_dim: The column name to use as a timestamp > @@ -146,7 +155,7 @@ class DruidHook(BaseHook): > """ > task_id = self.send_ingest_query( > datasource, static_path, ts_dim, columns, metric_spec, > - intervals, num_shards, hadoop_dependency_coordinates) > + intervals, num_shards, target_partition_size, > hadoop_dependency_coordinates) > status_url = self.get_ingest_status_url(task_id) > while True: > r = requests.get(status_url) > > > http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/operators/hive_to_druid.py > ---------------------------------------------------------------------- > diff --git a/airflow/operators/hive_to_druid.py > b/airflow/operators/hive_to_druid.py > index 1346841..420aeed 100644 > --- a/airflow/operators/hive_to_druid.py > +++ b/airflow/operators/hive_to_druid.py > @@ -49,7 +49,8 @@ class HiveToDruidTransfer(BaseOperator): > metastore_conn_id='metastore_default', > hadoop_dependency_coordinates=None, > intervals=None, > - num_shards=1, > + num_shards=-1, > + target_partition_size=-1, > *args, **kwargs): > super(HiveToDruidTransfer, self).__init__(*args, **kwargs) > self.sql = sql > @@ -57,6 +58,7 @@ class HiveToDruidTransfer(BaseOperator): > self.ts_dim = ts_dim > self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}'] > self.num_shards = num_shards > + self.target_partition_size = target_partition_size > self.metric_spec = metric_spec or [{ > "name": "count", > "type": "count"}] > @@ -103,8 +105,8 @@ class HiveToDruidTransfer(BaseOperator): > datasource=self.druid_datasource, > intervals=self.intervals, > static_path=static_path, ts_dim=self.ts_dim, > - columns=columns, num_shards=self.num_shards, > metric_spec=self.metric_spec, > - > hadoop_dependency_coordinates=self.hadoop_dependency_coordinates) > + columns=columns, num_shards=self.num_shards, > target_partition_size=self.target_partition_size, > + metric_spec=self.metric_spec, > hadoop_dependency_coordinates=self.hadoop_dependency_coordinates) > logging.info("Load seems to have succeeded!") > > logging.info( > >
