[ 
https://issues.apache.org/jira/browse/BEAM-6553?focusedWorklogId=196054&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196054
 ]

ASF GitHub Bot logged work on BEAM-6553:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Feb/19 00:49
            Start Date: 08/Feb/19 00:49
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on pull request #7655: [BEAM-6553] A 
Python SDK sink that supports File Loads into BQ
URL: https://github.com/apache/beam/pull/7655#discussion_r254903154
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
 ##########
 @@ -0,0 +1,498 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for BigQuery file loads utilities."""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import os
+import random
+import time
+import unittest
+
+import mock
+from hamcrest.core import assert_that as hamcrest_assert
+from hamcrest.core.core.allof import all_of
+from hamcrest.core.core.is_ import is_
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
+from apache_beam.io.gcp import bigquery_file_loads as bqfl
+from apache_beam.io.gcp import bigquery
+from apache_beam.io.gcp import bigquery_tools
+from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+try:
+  from apitools.base.py.exceptions import HttpError
+except ImportError:
+  HttpError = None
+
+
+_DESTINATION_ELEMENT_PAIRS = [
+    # DESTINATION 1
+    ('project1:dataset1.table1', '{"name":"beam", "language":"py"}'),
+    ('project1:dataset1.table1', '{"name":"beam", "language":"java"}'),
+    ('project1:dataset1.table1', '{"name":"beam", "language":"go"}'),
+    ('project1:dataset1.table1', '{"name":"flink", "language":"java"}'),
+    ('project1:dataset1.table1', '{"name":"flink", "language":"scala"}'),
+
+    # DESTINATION 3
+    ('project1:dataset1.table3', '{"name":"spark", "language":"scala"}'),
+
+    # DESTINATION 1
+    ('project1:dataset1.table1', '{"name":"spark", "language":"py"}'),
+    ('project1:dataset1.table1', '{"name":"spark", "language":"scala"}'),
+
+    # DESTINATION 2
+    ('project1:dataset1.table2', '{"name":"beam", "foundation":"apache"}'),
+    ('project1:dataset1.table2', '{"name":"flink", "foundation":"apache"}'),
+    ('project1:dataset1.table2', '{"name":"spark", "foundation":"apache"}'),
+]
+
+_NAME_LANGUAGE_ELEMENTS = [
+    json.loads(elm[1])
+    for elm in _DESTINATION_ELEMENT_PAIRS if "language" in elm[1]
+]
+
+
+_DISTINCT_DESTINATIONS = list(
+    set([elm[0] for elm in _DESTINATION_ELEMENT_PAIRS]))
+
+
+_ELEMENTS = list([json.loads(elm[1]) for elm in _DESTINATION_ELEMENT_PAIRS])
+
+
[email protected](HttpError is None, 'GCP dependencies are not installed')
+class TestWriteRecordsToFile(_TestCaseWithTempDirCleanUp):
+  maxDiff = None
+
+  def _consume_input(self, fn, checks=None):
+    if checks is None:
+      return
+
+    with TestPipeline() as p:
+      output_pcs = (
+          p
+          | beam.Create(_DESTINATION_ELEMENT_PAIRS)
+          | beam.ParDo(fn, self.tmpdir)
+          .with_outputs(fn.WRITTEN_FILE_TAG, fn.UNWRITTEN_RECORD_TAG))
+
+      checks(output_pcs)
+      return output_pcs
+
+  def test_files_created(self):
+    """Test that the files are created and written."""
+
+    fn = bqfl.WriteRecordsToFile()
+    self.tmpdir = self._new_tempdir()
+
+    def check_files_created(output_pcs):
+      dest_file_pc = output_pcs[bqfl.WriteRecordsToFile.WRITTEN_FILE_TAG]
+
+      files = dest_file_pc | "GetFiles" >> beam.Map(lambda x: x[1])
+      file_count = files | "CountFiles" >> beam.combiners.Count.Globally()
+
+      _ = files | "FilesExist" >> beam.Map(
+          lambda x: hamcrest_assert(os.path.exists(x), is_(True)))
+      assert_that(file_count, equal_to([3]), label='check file count')
+
+      destinations = dest_file_pc | "GetDests" >> beam.Map(lambda x: x[0])
+      assert_that(destinations, equal_to(list(_DISTINCT_DESTINATIONS)),
+                  label='check destinations ')
+
+    self._consume_input(fn, check_files_created)
+
+  def test_many_files(self):
+    """Forces records to be written to many files.
+
+    For each destination multiple files are necessary. This is because the max
+    file length is very small, so only a couple records fit in each file.
+    """
+
+    fn = bqfl.WriteRecordsToFile(max_file_size=50)
+    self.tmpdir = self._new_tempdir()
+
+    def check_many_files(output_pcs):
+      dest_file_pc = output_pcs[bqfl.WriteRecordsToFile.WRITTEN_FILE_TAG]
+
+      files_per_dest = (dest_file_pc
 
 Review comment:
   These are unittest-type test pipelines. They run locally.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 196054)

> A BigQuery sink thta is SDK-implemented and supports file loads in Python
> -------------------------------------------------------------------------
>
>                 Key: BEAM-6553
>                 URL: https://issues.apache.org/jira/browse/BEAM-6553
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>              Labels: triaged
>          Time Spent: 6h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to