Repository: incubator-airflow
Updated Branches:
  refs/heads/master 984a87c0c -> 4cf904cf5


[AIRFLOW-855] Replace PickleType with LargeBinary in XCom

PickleType in Xcom allows remote code execution.
In order to deprecate
it without changing mysql table schema, change
PickleType to LargeBinary
 because they both maps to blob type in mysql. Add
"enable_pickling" to
function signature to control using ether pickle
type or JSON. "enable_pickling"
 should also be added to core section of
airflow.cfg

Picked up where https://github.com/apache
/incubator-airflow/pull/2132 left off. Took this
PR, fixed merge conflicts, added
documentation/tests, fixed broken tests/operators,
and fixed the python3 issues.

Closes #2518 from aoen/disable-pickle-type


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4cf904cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4cf904cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4cf904cf

Branch: refs/heads/master
Commit: 4cf904cf5a7a070bbeaf3a0e985ed2b840276015
Parents: 984a87c
Author: Dan Davydov <[email protected]>
Authored: Tue Aug 15 12:24:02 2017 -0700
Committer: Dan Davydov <[email protected]>
Committed: Tue Aug 15 12:24:07 2017 -0700

----------------------------------------------------------------------
 UPDATING.md                                     |   6 +
 airflow/config_templates/default_airflow.cfg    |   4 +
 airflow/config_templates/default_test.cfg       |   1 +
 airflow/contrib/operators/ssh_operator.py       |  10 +-
 ...c56_make_xcom_value_column_a_large_binary.py |  45 ++++++++
 airflow/models.py                               | 110 ++++++++++++++-----
 tests/contrib/operators/test_sftp_operator.py   |  84 +++++++++++++-
 tests/contrib/operators/test_ssh_operator.py    |  24 +++-
 tests/models.py                                 |  87 +++++++++++++++
 9 files changed, 338 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 3a880ab..92ee4b4 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -32,6 +32,12 @@ supported and will be removed entirely in Airflow 2.0
 
 - `contrib.hooks.gcp_dataflow_hook.DataFlowHook` starts to use 
`--runner=DataflowRunner` instead of `DataflowPipelineRunner`, which is removed 
from the package `google-cloud-dataflow-0.6.0`.
 
+- The pickle type for XCom messages has been replaced by json to prevent RCE 
attacks.
+  Note that JSON serialization is stricter than pickling, so if you want to 
e.g. pass
+  raw bytes through XCom you must encode them using an encoding like base64.
+  By default pickling is still enabled until Airflow 2.0. To disable it 
+  Set enable_xcom_pickling = False in your Airflow config.
+
 ## Airflow 1.8.1
 
 The Airflow package name was changed from `airflow` to `apache-airflow` during 
this release. You must uninstall your

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index dcb99ed..b568d3a 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -129,6 +129,10 @@ logging_config_path =
 # Default to use file task handler.
 task_log_reader = file.task
 
+# Whether to enable pickling for xcom (note that this is insecure and allows 
for
+# RCE exploits). This will be deprecated in Airflow 2.0 (be forced to False).
+enable_xcom_pickling = True
+
 [cli]
 # In what way should the cli access the API. The LocalClient will use the
 # database directly, while the json_client will use the api running on the

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/airflow/config_templates/default_test.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_test.cfg 
b/airflow/config_templates/default_test.cfg
index 88b19a5..4452ffa 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -37,6 +37,7 @@ dag_concurrency = 16
 dags_are_paused_at_creation = False
 fernet_key = {FERNET_KEY}
 non_pooled_task_slot_count = 128
+enable_xcom_pickling = False
 
 [cli]
 api_client = airflow.api.client.local_client

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/airflow/contrib/operators/ssh_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ssh_operator.py 
b/airflow/contrib/operators/ssh_operator.py
index ff874da..2e03b96 100644
--- a/airflow/contrib/operators/ssh_operator.py
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -12,8 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from base64 import b64encode
 import logging
 
+from airflow import configuration
 from airflow.contrib.hooks.ssh_hook import SSHHook
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
@@ -90,7 +92,13 @@ class SSHOperator(BaseOperator):
                 # only returning on output if do_xcom_push is set
                 # otherwise its not suppose to be disclosed
                 if self.do_xcom_push:
-                    return stdout.read()
+                    enable_pickling = configuration.getboolean('core',
+                                                               
'enable_xcom_pickling')
+                    if enable_pickling:
+                        return stdout.read()
+                    else:
+                        return b64encode(stdout.read()).decode('utf-8')
+
             else:
                 error_msg = stderr.read()
                 raise AirflowException("error running cmd: {0}, error: {1}"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py
----------------------------------------------------------------------
diff --git 
a/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py
 
b/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py
new file mode 100644
index 0000000..ea37d76
--- /dev/null
+++ 
b/airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py
@@ -0,0 +1,45 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Make xcom value column a large binary
+
+Revision ID: bdaa763e6c56
+Revises: cc1e65623dc7
+Create Date: 2017-08-14 16:06:31.568971
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'bdaa763e6c56'
+down_revision = 'cc1e65623dc7'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import dill
+import sqlalchemy as sa
+
+
+def upgrade():
+    # There can be data truncation here as LargeBinary can be smaller than the 
pickle 
+    # type.
+
+    # use batch_alter_table to support SQLite workaround
+    with op.batch_alter_table("xcom") as batch_op:
+        batch_op.alter_column('value', type_=sa.LargeBinary())
+
+
+def downgrade():
+    # use batch_alter_table to support SQLite workaround
+    with op.batch_alter_table("xcom") as batch_op:
+        batch_op.alter_column('value', type_=sa.PickleType(pickler=dill))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index c15491c..0b82c56 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -50,7 +50,7 @@ from urllib.parse import urlparse
 
 from sqlalchemy import (
     Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType,
-    Index, Float)
+    Index, Float, LargeBinary)
 from sqlalchemy import func, or_, and_
 from sqlalchemy.ext.declarative import declarative_base, declared_attr
 from sqlalchemy.dialects.mysql import LONGTEXT
@@ -104,7 +104,7 @@ if 'mysql' in settings.SQL_ALCHEMY_CONN:
 else:
     LongText = Text
 
-# used by DAG context_managers
+# Used by DAG context_managers
 _CONTEXT_MANAGER_DAG = None
 
 
@@ -3921,7 +3921,7 @@ class XCom(Base):
 
     id = Column(Integer, primary_key=True)
     key = Column(String(512))
-    value = Column(PickleType(pickler=dill))
+    value = Column(LargeBinary)
     timestamp = Column(
         DateTime, default=func.now(), nullable=False)
     execution_date = Column(DateTime, nullable=False)
@@ -3949,12 +3949,32 @@ class XCom(Base):
             execution_date,
             task_id,
             dag_id,
+            enable_pickling=None,
             session=None):
         """
         Store an XCom value.
+        TODO: "pickling" has been deprecated and JSON is preferred. "pickling" 
will be
+        removed in Airflow 2.0. :param enable_pickling: If pickling is not 
enabled, the
+        XCOM value will be parsed as JSON instead.
+        :return: None
         """
         session.expunge_all()
 
+        if enable_pickling is None:
+            enable_pickling = configuration.getboolean('core', 
'enable_xcom_pickling')
+
+        if enable_pickling:
+            value = pickle.dumps(value)
+        else:
+            try:
+                value = json.dumps(value).encode('UTF-8')
+            except ValueError:
+                logging.error("Could not serialize the XCOM value into JSON. "
+                              "If you are using pickles instead of JSON "
+                              "for XCOM, then you need to enable pickle "
+                              "support for XCOM in your airflow config.")
+                raise
+
         # remove any duplicate XComs
         session.query(cls).filter(
             cls.key == key,
@@ -3977,15 +3997,19 @@ class XCom(Base):
     @classmethod
     @provide_session
     def get_one(
-            cls,
-            execution_date,
-            key=None,
-            task_id=None,
-            dag_id=None,
-            include_prior_dates=False,
-            session=None):
-        """
-        Retrieve an XCom value, optionally meeting certain criteria
+                cls,
+                execution_date,
+                key=None,
+                task_id=None,
+                dag_id=None,
+                include_prior_dates=False,
+                enable_pickling=None,
+                session=None):
+        """
+        Retrieve an XCom value, optionally meeting certain criteria.
+        TODO: "pickling" has been deprecated and JSON is preferred. "pickling" 
will be removed in Airflow 2.0.
+        :param enable_pickling: If pickling is not enabled, the XCOM value 
will be parsed to JSON instead.
+        :return: XCom value
         """
         filters = []
         if key:
@@ -4001,27 +4025,41 @@ class XCom(Base):
 
         query = (
             session.query(cls.value)
-            .filter(and_(*filters))
-            .order_by(cls.execution_date.desc(), cls.timestamp.desc())
-            .limit(1))
+                .filter(and_(*filters))
+                .order_by(cls.execution_date.desc(), cls.timestamp.desc()))
 
         result = query.first()
         if result:
-            return result.value
+            if enable_pickling is None:
+                enable_pickling = configuration.getboolean('core', 
'enable_xcom_pickling')
+
+            if enable_pickling:
+                return pickle.loads(result.value)
+            else:
+                try:
+                    return json.loads(result.value.decode('UTF-8'))
+                except ValueError:
+                    logging.error("Could not serialize the XCOM value into 
JSON. "
+                                  "If you are using pickles instead of JSON "
+                                  "for XCOM, then you need to enable pickle "
+                                  "support for XCOM in your airflow config.")
+                    raise
 
     @classmethod
     @provide_session
     def get_many(
-            cls,
-            execution_date,
-            key=None,
-            task_ids=None,
-            dag_ids=None,
-            include_prior_dates=False,
-            limit=100,
-            session=None):
+                cls,
+                execution_date,
+                key=None,
+                task_ids=None,
+                dag_ids=None,
+                include_prior_dates=False,
+                limit=100,
+                enable_pickling=None,
+                session=None):
         """
         Retrieve an XCom value, optionally meeting certain criteria
+        TODO: "pickling" has been deprecated and JSON is preferred. "pickling" 
will be removed in Airflow 2.0.
         """
         filters = []
         if key:
@@ -4037,11 +4075,25 @@ class XCom(Base):
 
         query = (
             session.query(cls)
-            .filter(and_(*filters))
-            .order_by(cls.execution_date.desc(), cls.timestamp.desc())
-            .limit(limit))
-
-        return query.all()
+                .filter(and_(*filters))
+                .order_by(cls.execution_date.desc(), cls.timestamp.desc())
+                .limit(limit))
+        results = query.all()
+        if enable_pickling is None:
+            enable_pickling = configuration.getboolean('core', 
'enable_xcom_pickling')
+        for result in results:
+            if enable_pickling:
+                result.value = pickle.loads(result.value)
+            else:
+                try:
+                    result.value = json.loads(result.value.decode('UTF-8'))
+                except ValueError:
+                    logging.error("Could not serialize the XCOM value into 
JSON. "
+                                    "If you are using pickles instead of JSON "
+                                    "for XCOM, then you need to enable pickle "
+                                    "support for XCOM in your airflow config.")
+                    raise
+        return results
 
     @classmethod
     @provide_session

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/tests/contrib/operators/test_sftp_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_sftp_operator.py 
b/tests/contrib/operators/test_sftp_operator.py
index 3d31414..39e8d88 100644
--- a/tests/contrib/operators/test_sftp_operator.py
+++ b/tests/contrib/operators/test_sftp_operator.py
@@ -14,6 +14,7 @@
 
 import os
 import unittest
+from base64 import b64encode
 from datetime import datetime
 
 from airflow import configuration
@@ -60,7 +61,8 @@ class SFTPOperatorTest(unittest.TestCase):
         self.test_remote_filepath = '{0}/{1}'.format(self.test_dir,
                                                      self.test_remote_filename)
 
-    def test_file_transfer_put(self):
+    def test_pickle_file_transfer_put(self):
+        configuration.set("core", "enable_xcom_pickling", "True")
         test_local_file_content = \
             b"This is local file content \n which is multiline " \
             b"continuing....with other character\nanother line here \n this is 
last line"
@@ -96,7 +98,46 @@ class SFTPOperatorTest(unittest.TestCase):
                 ti3.xcom_pull(task_ids='test_check_file', 
key='return_value').strip(),
                 test_local_file_content)
 
-    def test_file_transfer_get(self):
+    def test_json_file_transfer_put(self):
+        configuration.set("core", "enable_xcom_pickling", "False")
+        test_local_file_content = \
+            b"This is local file content \n which is multiline " \
+            b"continuing....with other character\nanother line here \n this is 
last line"
+        # create a test file locally
+        with open(self.test_local_filepath, 'wb') as f:
+            f.write(test_local_file_content)
+
+        # put test file to remote
+        put_test_task = SFTPOperator(
+                task_id="test_sftp",
+                ssh_hook=self.hook,
+                local_filepath=self.test_local_filepath,
+                remote_filepath=self.test_remote_filepath,
+                operation=SFTPOperation.PUT,
+                dag=self.dag
+        )
+        self.assertIsNotNone(put_test_task)
+        ti2 = TaskInstance(task=put_test_task, execution_date=datetime.now())
+        ti2.run()
+
+        # check the remote file content
+        check_file_task = SSHOperator(
+                task_id="test_check_file",
+                ssh_hook=self.hook,
+                command="cat {0}".format(self.test_remote_filepath),
+                do_xcom_push=True,
+                dag=self.dag
+        )
+        self.assertIsNotNone(check_file_task)
+        ti3 = TaskInstance(task=check_file_task, execution_date=datetime.now())
+        ti3.run()
+        self.assertEqual(
+                ti3.xcom_pull(task_ids='test_check_file', 
key='return_value').strip(),
+                b64encode(test_local_file_content).decode('utf-8'))
+
+
+    def test_pickle_file_transfer_get(self):
+        configuration.set("core", "enable_xcom_pickling", "True")
         test_remote_file_content = \
             "This is remote file content \n which is also multiline " \
             "another line here \n this is last line. EOF"
@@ -133,6 +174,45 @@ class SFTPOperatorTest(unittest.TestCase):
             content_received = f.read()
         self.assertEqual(content_received.strip(), test_remote_file_content)
 
+    def test_json_file_transfer_get(self):
+        configuration.set("core", "enable_xcom_pickling", "False")
+        test_remote_file_content = \
+            "This is remote file content \n which is also multiline " \
+            "another line here \n this is last line. EOF"
+
+        # create a test file remotely
+        create_file_task = SSHOperator(
+                task_id="test_create_file",
+                ssh_hook=self.hook,
+                command="echo '{0}' > {1}".format(test_remote_file_content,
+                                                  self.test_remote_filepath),
+                do_xcom_push=True,
+                dag=self.dag
+        )
+        self.assertIsNotNone(create_file_task)
+        ti1 = TaskInstance(task=create_file_task, 
execution_date=datetime.now())
+        ti1.run()
+
+        # get remote file to local
+        get_test_task = SFTPOperator(
+                task_id="test_sftp",
+                ssh_hook=self.hook,
+                local_filepath=self.test_local_filepath,
+                remote_filepath=self.test_remote_filepath,
+                operation=SFTPOperation.GET,
+                dag=self.dag
+        )
+        self.assertIsNotNone(get_test_task)
+        ti2 = TaskInstance(task=get_test_task, execution_date=datetime.now())
+        ti2.run()
+
+        # test the received content
+        content_received = None
+        with open(self.test_local_filepath, 'r') as f:
+            content_received = f.read()
+        self.assertEqual(content_received.strip(),
+            test_remote_file_content.encode('utf-8').decode('utf-8'))
+
     def delete_local_resource(self):
         if os.path.exists(self.test_local_filepath):
             os.remove(self.test_local_filepath)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/tests/contrib/operators/test_ssh_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_ssh_operator.py 
b/tests/contrib/operators/test_ssh_operator.py
index 21433d3..f205b97 100644
--- a/tests/contrib/operators/test_ssh_operator.py
+++ b/tests/contrib/operators/test_ssh_operator.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import unittest
+from base64 import b64encode
 from datetime import datetime
 
 from airflow import configuration
@@ -51,7 +52,27 @@ class SSHOperatorTest(unittest.TestCase):
         self.hook = hook
         self.dag = dag
 
-    def test_command_execution(self):
+    def test_json_command_execution(self):
+        configuration.set("core", "enable_xcom_pickling", "False")
+        task = SSHOperator(
+                task_id="test",
+                ssh_hook=self.hook,
+                command="echo -n airflow",
+                do_xcom_push=True,
+                dag=self.dag,
+        )
+
+        self.assertIsNotNone(task)
+
+        ti = TaskInstance(
+                task=task, execution_date=datetime.now())
+        ti.run()
+        self.assertIsNotNone(ti.duration)
+        self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'),
+                         b64encode(b'airflow').decode('utf-8'))
+
+    def test_pickle_command_execution(self):
+        configuration.set("core", "enable_xcom_pickling", "True")
         task = SSHOperator(
                 task_id="test",
                 ssh_hook=self.hook,
@@ -69,6 +90,7 @@ class SSHOperatorTest(unittest.TestCase):
         self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), 
b'airflow')
 
     def test_command_execution_with_env(self):
+        configuration.set("core", "enable_xcom_pickling", "True")
         task = SSHOperator(
             task_id="test",
             ssh_hook=self.hook,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cf904cf/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index d1621ad..266e036 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -30,6 +30,7 @@ from airflow.models import DAG, TaskInstance as TI
 from airflow.models import State as ST
 from airflow.models import DagModel, DagStat
 from airflow.models import clear_task_instances
+from airflow.models import XCom
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
@@ -1352,3 +1353,89 @@ class ClearTasksTest(unittest.TestCase):
         self.assertEqual(ti2.try_number, 1)
         # try_number (0) + retries(1)
         self.assertEqual(ti2.max_tries, 1)
+
+    def test_xcom_disable_pickle_type(self):
+        json_obj = {"key": "value"}
+        execution_date = datetime.datetime.now()
+        key = "xcom_test1"
+        dag_id = "test_dag1"
+        task_id = "test_task1"
+
+        XCom.set(key=key,
+                 value=json_obj,
+                 dag_id=dag_id,
+                 task_id=task_id,
+                 execution_date=execution_date,
+                 enable_pickling=False)
+
+        ret_value = XCom.get_one(key=key,
+                 dag_id=dag_id,
+                 task_id=task_id,
+                 execution_date=execution_date,
+                 enable_pickling=False)
+
+        self.assertEqual(ret_value, json_obj)
+
+    def test_xcom_enable_pickle_type(self):
+        json_obj = {"key": "value"}
+        execution_date = datetime.datetime.now()
+        key = "xcom_test2"
+        dag_id = "test_dag2"
+        task_id = "test_task2"
+
+        XCom.set(key=key,
+                 value=json_obj,
+                 dag_id=dag_id,
+                 task_id=task_id,
+                 execution_date=execution_date,
+                 enable_pickling=True)
+
+        ret_value = XCom.get_one(key=key,
+                 dag_id=dag_id,
+                 task_id=task_id,
+                 execution_date=execution_date,
+                 enable_pickling=True)
+
+        self.assertEqual(ret_value, json_obj)
+
+    def test_xcom_disable_pickle_type_fail_on_non_json(self):
+        class PickleRce(object):
+            def __reduce__(self):
+                return (os.system, ("ls -alt",))
+        self.assertRaises(TypeError, XCom.set,
+                          key="xcom_test3",
+                          value=PickleRce(),
+                          dag_id="test_dag3",
+                          task_id="test_task3",
+                          execution_date=datetime.datetime.now(),
+                          enable_pickling=False)
+
+    def test_xcom_get_many(self):
+        json_obj = {"key": "value"}
+        execution_date = datetime.datetime.now()
+        key = "xcom_test4"
+        dag_id1 = "test_dag4"
+        task_id1 = "test_task4"
+        dag_id2 = "test_dag5"
+        task_id2 = "test_task5"
+
+        XCom.set(key=key,
+                 value=json_obj,
+                 dag_id=dag_id1,
+                 task_id=task_id1,
+                 execution_date=execution_date,
+                 enable_pickling=True)
+
+        XCom.set(key=key,
+                 value=json_obj,
+                 dag_id=dag_id2,
+                 task_id=task_id2,
+                 execution_date=execution_date,
+                 enable_pickling=True)
+
+        results = XCom.get_many(key=key,
+                                execution_date=execution_date,
+                                enable_pickling=True)
+
+        for result in results:
+            self.assertEqual(result.value, json_obj)

Reply via email to