mik-laj commented on a change in pull request #10032:
URL: https://github.com/apache/airflow/pull/10032#discussion_r462184492



##########
File path: airflow/providers/google/cloud/example_dags/example_datastore.py
##########
@@ -37,20 +40,116 @@
     "example_gcp_datastore",
     schedule_interval=None,  # Override to match your needs
     start_date=dates.days_ago(1),
-    tags=['example'],
+    tags=["example"],
 ) as dag:
+    # [START how_to_export_task]
     export_task = CloudDatastoreExportEntitiesOperator(
         task_id="export_task",
         bucket=BUCKET,
         project_id=GCP_PROJECT_ID,
         overwrite_existing=True,
     )
+    # [END how_to_export_task]
 
+    # [START how_to_import_task]
     import_task = CloudDatastoreImportEntitiesOperator(
         task_id="import_task",
         bucket="{{ 
task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] 
}}",
         file="{{ 
'/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:])
 }}",
-        project_id=GCP_PROJECT_ID
+        project_id=GCP_PROJECT_ID,
     )
+    # [END how_to_import_task]
 
     export_task >> import_task
+
+# [START how_to_keys_def]
+KEYS = [
+    {
+        "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": ""},
+        "path": {"kind": "airflow"},
+    }
+]
+# [END how_to_keys_def]
+
+# [START how_to_transaction_def]
+TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
+# [END how_to_transaction_def]
+
+# [START how_to_commit_def]
+COMMIT_BODY = {
+    "mode": "TRANSACTIONAL",
+    "mutations": [
+        {
+            "insert": {
+                "key": KEYS[0],
+                "properties": {"string": {"stringValue": "airflow is 
awesome!"}},
+            }
+        }
+    ],
+    "transaction": "{{ task_instance.xcom_pull('begin_transaction_commit') }}",
+}
+# [END how_to_commit_def]
+
+# [START how_to_query_def]
+QUERY = {
+    "partitionId": {"projectId": GCP_PROJECT_ID, "namespaceId": ""},
+    "readOptions": {
+        "transaction": "{{ task_instance.xcom_pull('begin_transaction_query') 
}}"
+    },
+    "query": {},
+}
+# [END how_to_query_def]
+
+with models.DAG(
+    "example_gcp_datastore_operations",
+    start_date=dates.days_ago(1),
+    schedule_interval=None,  # Override to match your needs
+    tags=["example"],
+) as dag2:
+    # [START how_to_allocate_ids]
+    allocate_ids = CloudDatastoreAllocateIdsOperator(
+        task_id="allocate_ids", partial_keys=KEYS, project_id=GCP_PROJECT_ID
+    )
+    # [END how_to_allocate_ids]
+
+    # [START how_to_begin_transaction]
+    begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
+        task_id="begin_transaction_commit",
+        transaction_options=TRANSACTION_OPTIONS,
+        project_id=GCP_PROJECT_ID,
+    )
+    # [END how_to_begin_transaction]
+
+    # [START how_to_commit_task]
+    commit_task = CloudDatastoreCommitOperator(
+        task_id="commit_task", body=COMMIT_BODY, project_id=GCP_PROJECT_ID
+    )
+    # [END how_to_commit_task]
+
+    allocate_ids >> begin_transaction_commit >> commit_task
+
+    begin_transaction_query = CloudDatastoreBeginTransactionOperator(
+        task_id="begin_transaction_query",
+        transaction_options=TRANSACTION_OPTIONS,
+        project_id=GCP_PROJECT_ID,
+    )
+
+    # [START how_to_run_query]
+    run_query = CloudDatastoreRunQueryOperator(
+        task_id="run_query", body=QUERY, project_id=GCP_PROJECT_ID
+    )
+    # [END how_to_run_query]
+
+    allocate_ids >> begin_transaction_query >> run_query
+
+    begin_transaction_to_rollback = CloudDatastoreBeginTransactionOperator(
+        task_id="begin_transaction_to_rollback",
+        transaction_options=TRANSACTION_OPTIONS,
+        project_id=GCP_PROJECT_ID,
+    )
+
+    # [START how_to_rollback_transaction]
+    rollback_transaction = CloudDatastoreRollbackOperator(
+        task_id="rollback_transaction", 
transaction=begin_transaction_to_rollback.output

Review comment:
       This is not available in Airflow 1.10, so you cannot use it without 
bowler in backport packages.

##########
File path: airflow/providers/google/cloud/operators/datastore.py
##########
@@ -191,3 +199,287 @@ def execute(self, context):
             raise AirflowException('Operation failed: 
result={}'.format(result))
 
         return result
+
+
+class CloudDatastoreAllocateIdsOperator(BaseOperator):
+    """
+    Allocate IDs for incomplete keys. Return list of keys.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:CloudDatastoreAllocateIdsOperator`
+
+    .. seealso::
+        
https://cloud.google.com/datastore/docs/reference/rest/v1/projects/allocateIds
+
+    :param partial_keys: a list of partial keys.
+    :type partial_keys: list
+    :param project_id: Google Cloud Platform project ID against which to make 
the request.
+    :type project_id: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    """
+    template_fields = ("partial_keys",)
+
+    @apply_defaults
+    def __init__(
+        self,
+        partial_keys: List,
+        gcp_conn_id: str = 'google_cloud_default',
+        project_id: Optional[str] = None,
+        *args,

Review comment:
       ```suggestion
   ```

##########
File path: airflow/providers/google/cloud/operators/datastore.py
##########
@@ -191,3 +199,287 @@ def execute(self, context):
             raise AirflowException('Operation failed: 
result={}'.format(result))
 
         return result
+
+
+class CloudDatastoreAllocateIdsOperator(BaseOperator):
+    """
+    Allocate IDs for incomplete keys. Return list of keys.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:CloudDatastoreAllocateIdsOperator`
+
+    .. seealso::
+        
https://cloud.google.com/datastore/docs/reference/rest/v1/projects/allocateIds
+
+    :param partial_keys: a list of partial keys.
+    :type partial_keys: list
+    :param project_id: Google Cloud Platform project ID against which to make 
the request.
+    :type project_id: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    """
+    template_fields = ("partial_keys",)
+
+    @apply_defaults
+    def __init__(
+        self,
+        partial_keys: List,

Review comment:
       ```suggestion
           *,
           partial_keys: List,
   ```

##########
File path: airflow/providers/google/cloud/operators/datastore.py
##########
@@ -191,3 +199,287 @@ def execute(self, context):
             raise AirflowException('Operation failed: 
result={}'.format(result))
 
         return result
+
+
+class CloudDatastoreAllocateIdsOperator(BaseOperator):
+    """
+    Allocate IDs for incomplete keys. Return list of keys.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:CloudDatastoreAllocateIdsOperator`
+
+    .. seealso::
+        
https://cloud.google.com/datastore/docs/reference/rest/v1/projects/allocateIds
+
+    :param partial_keys: a list of partial keys.
+    :type partial_keys: list
+    :param project_id: Google Cloud Platform project ID against which to make 
the request.
+    :type project_id: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    """
+    template_fields = ("partial_keys",)
+
+    @apply_defaults
+    def __init__(
+        self,
+        partial_keys: List,
+        gcp_conn_id: str = 'google_cloud_default',
+        project_id: Optional[str] = None,
+        *args,
+        **kwargs
+    ) -> None:
+        super().__init__(*args, **kwargs)
+
+        self.partial_keys = partial_keys
+        self.gcp_conn_id = gcp_conn_id
+        self.project_id = project_id
+
+    def execute(self, context):
+        hook = DatastoreHook(gcp_conn_id=self.gcp_conn_id)

Review comment:
       Should we pass delegate_to also?

##########
File path: airflow/providers/google/cloud/operators/datastore.py
##########
@@ -191,3 +199,287 @@ def execute(self, context):
             raise AirflowException('Operation failed: 
result={}'.format(result))
 
         return result
+
+
+class CloudDatastoreAllocateIdsOperator(BaseOperator):
+    """
+    Allocate IDs for incomplete keys. Return list of keys.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:CloudDatastoreAllocateIdsOperator`
+
+    .. seealso::
+        
https://cloud.google.com/datastore/docs/reference/rest/v1/projects/allocateIds
+
+    :param partial_keys: a list of partial keys.
+    :type partial_keys: list
+    :param project_id: Google Cloud Platform project ID against which to make 
the request.
+    :type project_id: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    """
+    template_fields = ("partial_keys",)
+
+    @apply_defaults
+    def __init__(
+        self,
+        partial_keys: List,
+        gcp_conn_id: str = 'google_cloud_default',
+        project_id: Optional[str] = None,

Review comment:
       It doesn't make much difference, but gcp_conn_id should be the last 
parameter. I would have ignored it, but now the order of the parameters doesn't 
match the documentation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to