Repository: nifi
Updated Branches:
  refs/heads/master f6b171d5f -> 03ef64654


NIFI-4731: BQ Processors and GCP library update.

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/444caf8a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/444caf8a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/444caf8a

Branch: refs/heads/master
Commit: 444caf8a788c8c0a21c45558bb16cd8ee4b42872
Parents: f6b171d
Author: Daniel Jimenez <[email protected]>
Authored: Sun Apr 29 10:01:34 2018 +0200
Committer: joewitt <[email protected]>
Committed: Wed Nov 21 10:27:41 2018 -0500

----------------------------------------------------------------------
 .../nifi-gcp-bundle/nifi-gcp-processors/pom.xml |  10 +-
 .../processors/gcp/AbstractGCPProcessor.java    |   3 +-
 .../gcp/bigquery/AbstractBigQueryProcessor.java | 122 +++++++++
 .../gcp/bigquery/BigQueryAttributes.java        |  80 ++++++
 .../nifi/processors/gcp/bigquery/BqUtils.java   |  84 ++++++
 .../gcp/bigquery/PutBigQueryBatch.java          | 269 +++++++++++++++++++
 .../gcp/storage/AbstractGCSProcessor.java       |  36 +--
 .../processors/gcp/storage/ListGCSBucket.java   |  11 +-
 .../org.apache.nifi.processor.Processor         |   3 +-
 .../processors/gcp/bigquery/AbstractBQTest.java |  96 +++++++
 .../gcp/bigquery/AbstractBigQueryIT.java        |  79 ++++++
 .../gcp/bigquery/PutBigQueryBatchIT.java        | 137 ++++++++++
 .../gcp/bigquery/PutBigQueryBatchTest.java      | 153 +++++++++++
 .../gcp/storage/ListGCSBucketTest.java          |   1 -
 .../nifi-gcp-services-api/pom.xml               |   1 +
 15 files changed, 1060 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 669cbc6..24c60bf 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -62,7 +62,7 @@
         </dependency>
         <dependency>
             <groupId>com.google.cloud</groupId>
-            <artifactId>google-cloud-storage</artifactId>
+            <artifactId>google-cloud-core</artifactId>
             <exclusions>
                 <exclusion>
                     <groupId>com.google.code.findbugs</groupId>
@@ -72,6 +72,14 @@
         </dependency>
         <dependency>
             <groupId>com.google.cloud</groupId>
+            <artifactId>google-cloud-storage</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.cloud</groupId>
+            <artifactId>google-cloud-bigquery</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.cloud</groupId>
             <artifactId>google-cloud-pubsub</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
index 2c178ff..e95bf73 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
@@ -43,7 +43,8 @@ public abstract class AbstractGCPProcessor<
             .Builder().name("gcp-project-id")
             .displayName("Project ID")
             .description("Google Cloud Project ID")
-            .required(true)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
new file mode 100644
index 0000000..b52a552
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Base class for creating processors that connect to GCP BiqQuery service
+ */
+public abstract class AbstractBigQueryProcessor extends 
AbstractGCPProcessor<BigQuery, BigQueryOptions> {
+    static final int BUFFER_SIZE = 65536;
+    public static final Relationship REL_SUCCESS =
+            new Relationship.Builder().name("success")
+                    .description("FlowFiles are routed to this relationship 
after a successful Google BigQuery operation.")
+                    .build();
+    public static final Relationship REL_FAILURE =
+            new Relationship.Builder().name("failure")
+                    .description("FlowFiles are routed to this relationship if 
the Google BigQuery operation fails.")
+                    .build();
+
+    public static final Set<Relationship> relationships = 
Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    public static final PropertyDescriptor DATASET = new PropertyDescriptor
+            .Builder().name(BigQueryAttributes.DATASET_ATTR)
+            .displayName("Dataset")
+            .description(BigQueryAttributes.DATASET_DESC)
+            .required(true)
+            .defaultValue("${" + BigQueryAttributes.DATASET_ATTR + "}")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor
+            .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR)
+            .displayName("Table Name")
+            .description(BigQueryAttributes.TABLE_NAME_DESC)
+            .required(true)
+            .defaultValue("${" + BigQueryAttributes.TABLE_NAME_ATTR + "}")
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TABLE_SCHEMA = new 
PropertyDescriptor
+            .Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
+            .displayName("Table Schema")
+            .description(BigQueryAttributes.TABLE_SCHEMA_DESC)
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor READ_TIMEOUT = new 
PropertyDescriptor
+            .Builder().name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
+            .displayName("Read Timeout")
+            .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
+            .required(true)
+            .defaultValue("5 minutes")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return ImmutableList.<PropertyDescriptor>builder()
+                .addAll(super.getSupportedPropertyDescriptors())
+                .build();
+    }
+
+    @Override
+    protected BigQueryOptions getServiceOptions(ProcessContext context, 
GoogleCredentials credentials) {
+        final String projectId = 
context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+        final Integer retryCount = 
Integer.valueOf(context.getProperty(RETRY_COUNT).getValue());
+
+        BigQueryOptions.Builder builder = 
BigQueryOptions.newBuilder().setCredentials(credentials);
+
+        if (!StringUtils.isBlank(projectId)) {
+            builder.setProjectId(projectId);
+        }
+
+        return builder
+                
.setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(retryCount).build())
+                .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
new file mode 100644
index 0000000..380f701
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import 
org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
+
+/**
+ * Attributes associated with the BigQuery processors
+ */
+public class BigQueryAttributes {
+    private BigQueryAttributes() {}
+
+    public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = 
CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
+
+    public static final String DATASET_ATTR = "bq.dataset";
+    public static final String DATASET_DESC = "BigQuery dataset";
+
+    public static final String TABLE_NAME_ATTR = "bq.table.name";
+    public static final String TABLE_NAME_DESC = "BigQuery table name";
+
+    public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
+    public static final String TABLE_SCHEMA_DESC = "BigQuery table name";
+
+    public static final String CREATE_DISPOSITION_ATTR = 
"bq.load.create_disposition";
+    public static final String CREATE_DISPOSITION_DESC = "Options for table 
creation";
+
+    public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
+    public static final String JOB_ERROR_MSG_DESC = "Load job error message";
+
+    public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
+    public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
+
+    public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
+    public static final String JOB_ERROR_LOCATION_DESC = "Load job error 
location";
+
+    public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
+    public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
+
+
+    // Batch Attributes
+    public static final String SOURCE_TYPE_ATTR = "bq.load.type";
+    public static final String SOURCE_TYPE_DESC = "Data type of the file to be 
loaded";
+
+    public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
+    public static final String IGNORE_UNKNOWN_DESC = "Ignore fields not in 
table schema";
+
+    public static final String WRITE_DISPOSITION_ATTR = 
"bq.load.write_disposition";
+    public static final String WRITE_DISPOSITION_DESC = "Options for writing 
to table";
+
+    public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
+    public static final String MAX_BADRECORDS_DESC = "Number of erroneous 
records to ignore before generating an error";
+
+    public static final String JOB_CREATE_TIME_ATTR = 
"bq.job.stat.creation_time";
+    public static final String JOB_CREATE_TIME_DESC = "Time load job creation";
+
+    public static final String JOB_END_TIME_ATTR = "bq.job.stat.end_time";
+    public static final String JOB_END_TIME_DESC = "Time load job ended";
+
+    public static final String JOB_START_TIME_ATTR = "bq.job.stat.start_time";
+    public static final String JOB_START_TIME_DESC = "Time load job started";
+
+    public static final String JOB_LINK_ATTR = "bq.job.link";
+    public static final String JOB_LINK_DESC = "API Link to load job";
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
new file mode 100644
index 0000000..f7f5d66
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.LegacySQLTypeName;
+import com.google.cloud.bigquery.Schema;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ *
+ */
+public class BqUtils {
+    private final static Type gsonSchemaType = new TypeToken<List<Map>>() {
+    }.getType();
+
+    public static Field mapToField(Map fMap) {
+        String typeStr = fMap.get("type").toString();
+        String nameStr = fMap.get("name").toString();
+        String modeStr = fMap.get("mode").toString();
+        LegacySQLTypeName type = null;
+
+        if (typeStr.equals("BOOLEAN")) {
+            type = LegacySQLTypeName.BOOLEAN;
+        } else if (typeStr.equals("STRING")) {
+            type = LegacySQLTypeName.STRING;
+        } else if (typeStr.equals("BYTES")) {
+            type = LegacySQLTypeName.BYTES;
+        } else if (typeStr.equals("INTEGER")) {
+            type = LegacySQLTypeName.INTEGER;
+        } else if (typeStr.equals("FLOAT")) {
+            type = LegacySQLTypeName.FLOAT;
+        } else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE")
+                || typeStr.equals("TIME") || typeStr.equals("DATETIME")) {
+            type = LegacySQLTypeName.TIMESTAMP;
+        } else if (typeStr.equals("RECORD")) {
+            type = LegacySQLTypeName.RECORD;
+        }
+
+        return Field.newBuilder(nameStr, 
type).setMode(Field.Mode.valueOf(modeStr)).build();
+    }
+
+    public static List<Field> listToFields(List<Map> m_fields) {
+        List<Field> fields = new ArrayList(m_fields.size());
+        for (Map m : m_fields) {
+            fields.add(mapToField(m));
+        }
+
+        return fields;
+    }
+
+    public static Schema schemaFromString(String schemaStr) {
+        if (schemaStr == null) {
+            return null;
+        } else {
+            Gson gson = new Gson();
+            List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
+            return Schema.of(BqUtils.listToFields(fields));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
new file mode 100644
index 0000000..99c7f2a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.RetryOption;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.TableDataWriteChannel;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.WriteChannelConfiguration;
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
+import org.apache.nifi.processors.gcp.storage.PutGCSObject;
+import org.apache.nifi.util.StringUtils;
+import org.threeten.bp.Duration;
+import org.threeten.bp.temporal.ChronoUnit;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A processor for batch loading data into a Google BigQuery table
+ */
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"google", "google cloud", "bq", "bigquery"})
+@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
+@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
+
+@WritesAttributes({
+        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, 
description = BigQueryAttributes.DATASET_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, 
description = BigQueryAttributes.TABLE_NAME_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, 
description = BigQueryAttributes.TABLE_SCHEMA_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, 
description = BigQueryAttributes.SOURCE_TYPE_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, 
description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
+        @WritesAttribute(attribute = 
BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = 
BigQueryAttributes.CREATE_DISPOSITION_DESC),
+        @WritesAttribute(attribute = 
BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = 
BigQueryAttributes.WRITE_DISPOSITION_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, 
description = BigQueryAttributes.MAX_BADRECORDS_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, 
description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, 
description = BigQueryAttributes.JOB_END_TIME_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, 
description = BigQueryAttributes.JOB_START_TIME_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, 
description = BigQueryAttributes.JOB_LINK_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, 
description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
+        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, 
description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
+        @WritesAttribute(attribute = 
BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = 
BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
+})
+
+public class PutBigQueryBatch extends AbstractBigQueryProcessor {
+
+    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
+            .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
+            .displayName("Load file type")
+            .description(BigQueryAttributes.SOURCE_TYPE_DESC)
+            .required(true)
+            .allowableValues(FormatOptions.json().getType(), 
FormatOptions.avro().getType(), FormatOptions.csv().getType())
+            .defaultValue(FormatOptions.avro().getType())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor IGNORE_UNKNOWN = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR)
+            .displayName("Ignore Unknown Values")
+            .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC)
+            .required(true)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+
+    public static final PropertyDescriptor CREATE_DISPOSITION = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR)
+            .displayName("Create Disposition")
+            .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
+            .required(true)
+            
.allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), 
JobInfo.CreateDisposition.CREATE_NEVER.name())
+            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor WRITE_DISPOSITION = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR)
+            .displayName("Write Disposition")
+            .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
+            .required(true)
+            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), 
JobInfo.WriteDisposition.WRITE_APPEND.name(), 
JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
+            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAXBAD_RECORDS = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.MAX_BADRECORDS_ATTR)
+            .displayName("Max Bad Records")
+            .description(BigQueryAttributes.MAX_BADRECORDS_DESC)
+            .required(true)
+            .defaultValue("0")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private Schema schemaCache = null;
+
+    public PutBigQueryBatch() {
+
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return ImmutableList.<PropertyDescriptor>builder()
+                .addAll(super.getSupportedPropertyDescriptors())
+                .add(DATASET)
+                .add(TABLE_NAME)
+                .add(TABLE_SCHEMA)
+                .add(SOURCE_TYPE)
+                .add(CREATE_DISPOSITION)
+                .add(WRITE_DISPOSITION)
+                .add(MAXBAD_RECORDS)
+                .add(IGNORE_UNKNOWN)
+                .build();
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(true)
+                .dynamic(true)
+                .build();
+    }
+
+    @Override
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        super.onScheduled(context);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final Map<String, String> attributes = new HashMap<>();
+
+        final BigQuery bq = getCloudService();
+
+        final String projectId = 
context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+        final String dataset = 
context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
+        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
+        final TableId tableId;
+        if (StringUtils.isEmpty(projectId)) {
+            tableId = TableId.of(dataset, tableName);
+        } else {
+            tableId = TableId.of(projectId, dataset, tableName);
+        }
+
+        final String fileType = context.getProperty(SOURCE_TYPE).getValue();
+
+        String schemaString = 
context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
+        Schema schema = BqUtils.schemaFromString(schemaString);
+
+        WriteChannelConfiguration writeChannelConfiguration =
+                WriteChannelConfiguration.newBuilder(tableId)
+                        
.setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
+                        
.setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
+                        
.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
+                        
.setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
+                        .setSchema(schema)
+                        .setFormatOptions(FormatOptions.of(fileType))
+                        .build();
+
+        TableDataWriteChannel writer = bq.writer(writeChannelConfiguration);
+
+        try {
+            session.read(flowFile, rawIn -> {
+                ReadableByteChannel readableByteChannel = 
Channels.newChannel(rawIn);
+                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
+                while (readableByteChannel.read(byteBuffer) >= 0) {
+                    byteBuffer.flip();
+                    writer.write(byteBuffer);
+                    byteBuffer.clear();
+                }
+            });
+
+            writer.close();
+
+            Job job = writer.getJob();
+            PropertyValue property = context.getProperty(READ_TIMEOUT);
+            Long timePeriod = 
property.evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
+            Duration duration = Duration.of(timePeriod, ChronoUnit.SECONDS);
+            job = job.waitFor(RetryOption.totalTimeout(duration));
+
+            if (job != null) {
+                attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, 
Long.toString(job.getStatistics().getCreationTime()));
+                attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, 
Long.toString(job.getStatistics().getEndTime()));
+                attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, 
Long.toString(job.getStatistics().getStartTime()));
+                attributes.put(BigQueryAttributes.JOB_LINK_ATTR, 
job.getSelfLink());
+
+                boolean jobError = (job.getStatus().getError() != null);
+
+                if (jobError) {
+                    attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, 
job.getStatus().getError().getMessage());
+                    attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, 
job.getStatus().getError().getReason());
+                    attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, 
job.getStatus().getError().getLocation());
+                } else {
+                    // in case it got looped back from error
+                    flowFile = session.removeAttribute(flowFile, 
BigQueryAttributes.JOB_ERROR_MSG_ATTR);
+                    flowFile = session.removeAttribute(flowFile, 
BigQueryAttributes.JOB_ERROR_REASON_ATTR);
+                    flowFile = session.removeAttribute(flowFile, 
BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
+                }
+
+                if (!attributes.isEmpty()) {
+                    flowFile = session.putAllAttributes(flowFile, attributes);
+                }
+
+                if (jobError) {
+                    flowFile = session.penalize(flowFile);
+                    session.transfer(flowFile, REL_FAILURE);
+                } else {
+                    session.transfer(flowFile, REL_SUCCESS);
+                }
+            }
+
+        } catch (Exception ex) {
+            getLogger().log(LogLevel.ERROR, ex.getMessage(), ex);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
index 4363115..7c63631 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
@@ -16,12 +16,15 @@
  */
 package org.apache.nifi.processors.gcp.storage;
 
-import com.google.api.gax.retrying.RetrySettings;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.http.HttpTransportOptions;
-import com.google.cloud.storage.Storage;
-import com.google.cloud.storage.StorageOptions;
-import com.google.common.collect.ImmutableList;
+import java.net.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -31,14 +34,12 @@ import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
 import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
 import org.apache.nifi.proxy.ProxyConfiguration;
 
-import java.net.Proxy;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.http.HttpTransportOptions;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import com.google.common.collect.ImmutableList;
 
 /**
  * Base class for creating processors which connect to Google Cloud Storage.
@@ -86,12 +87,11 @@ public abstract class AbstractGCSProcessor extends 
AbstractGCPProcessor<Storage,
 
     @Override
     protected StorageOptions getServiceOptions(ProcessContext context, 
GoogleCredentials credentials) {
-        final String projectId = context.getProperty(PROJECT_ID).getValue();
+        final String projectId = 
context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
         final Integer retryCount = 
context.getProperty(RETRY_COUNT).asInteger();
 
         StorageOptions.Builder storageOptionsBuilder = 
StorageOptions.newBuilder()
                 .setCredentials(credentials)
-                .setProjectId(projectId)
                 .setRetrySettings(RetrySettings.newBuilder()
                         .setMaxAttempts(retryCount)
                         .build());
@@ -113,6 +113,10 @@ public abstract class AbstractGCSProcessor extends 
AbstractGCPProcessor<Storage,
             return ProxyConfiguration.DIRECT_CONFIGURATION;
         });
 
+        if (!projectId.isEmpty()) {
+            storageOptionsBuilder.setProjectId(projectId);
+        }
+
         final ProxyAwareTransportFactory transportFactory = new 
ProxyAwareTransportFactory(proxyConfiguration);
         
storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build());
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index e018814..01293cf 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -150,8 +150,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
             .displayName("Bucket")
             .description(BUCKET_DESC)
             .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor PREFIX = new 
PropertyDescriptor.Builder()
@@ -159,7 +159,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
             .displayName("Prefix")
             .description("The prefix used to filter the object list. In most 
cases, it should end with a forward slash ('/').")
             .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor USE_GENERATIONS = new 
PropertyDescriptor.Builder()
@@ -242,9 +243,9 @@ public class ListGCSBucket extends AbstractGCSProcessor {
 
         final long startNanos = System.nanoTime();
 
-        final String bucket = context.getProperty(BUCKET).getValue();
+        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
 
-        final String prefix = context.getProperty(PREFIX).getValue();
+        final String prefix = 
context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
 
         final boolean useGenerations = 
context.getProperty(USE_GENERATIONS).asBoolean();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 249d19e..4ff9dfb 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -17,4 +17,5 @@ org.apache.nifi.processors.gcp.storage.FetchGCSObject
 org.apache.nifi.processors.gcp.storage.DeleteGCSObject
 org.apache.nifi.processors.gcp.storage.ListGCSBucket
 org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
-org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
\ No newline at end of file
+org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
+org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java
new file mode 100644
index 0000000..e424a3a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBQTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.bigquery;
+
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import org.apache.nifi.processor.Processor;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+
+/**
+ * Base class for BigQuery Unit Tests. Provides a framework for creating a 
TestRunner instance with always-required credentials.
+ */
+public abstract class AbstractBQTest {
+    private static final String PROJECT_ID = 
System.getProperty("test.gcp.project.id", "nifi-test-gcp-project");
+    private static final Integer RETRIES = 9;
+
+    static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
+
+    @Before
+    public void setup() throws Exception {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    public static TestRunner buildNewRunner(Processor processor) throws 
Exception {
+        final GCPCredentialsService credentialsService = new 
GCPCredentialsControllerService();
+
+        final TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.addControllerService("gcpCredentialsControllerService", 
credentialsService);
+        runner.enableControllerService(credentialsService);
+
+        
runner.setProperty(AbstractBigQueryProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, 
"gcpCredentialsControllerService");
+        runner.setProperty(AbstractBigQueryProcessor.PROJECT_ID, PROJECT_ID);
+        runner.setProperty(AbstractBigQueryProcessor.RETRY_COUNT, 
String.valueOf(RETRIES));
+
+        runner.assertValid(credentialsService);
+
+        return runner;
+    }
+
+    public abstract AbstractBigQueryProcessor getProcessor();
+
+    protected abstract void addRequiredPropertiesToRunner(TestRunner runner);
+
+    @Mock
+    protected BigQuery bq;
+
+    @Test
+    public void testBiqQueryOptionsConfiguration() throws Exception {
+        reset(bq);
+        final TestRunner runner = buildNewRunner(getProcessor());
+
+        final AbstractBigQueryProcessor processor = getProcessor();
+        final GoogleCredentials mockCredentials = 
mock(GoogleCredentials.class);
+
+        final BigQueryOptions options = 
processor.getServiceOptions(runner.getProcessContext(),
+                mockCredentials);
+
+        assertEquals("Project IDs should match",
+                PROJECT_ID, options.getProjectId());
+
+        assertEquals("Retry counts should match",
+                RETRIES.intValue(), 
options.getRetrySettings().getMaxAttempts());
+
+        assertSame("Credentials should be configured correctly",
+                mockCredentials, options.getCredentials());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
new file mode 100644
index 0000000..52327d4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.Dataset;
+import com.google.cloud.bigquery.DatasetInfo;
+import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processors.gcp.GCPIntegrationTests;
+import 
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNull;
+
+@Category(GCPIntegrationTests.class)
+public abstract class AbstractBigQueryIT {
+
+    static final String CONTROLLER_SERVICE = "GCPCredentialsService";
+    protected static BigQuery bigquery;
+    protected static Dataset dataset;
+    protected static TestRunner runner;
+
+    @BeforeClass
+    public static void beforeClass() {
+        dataset = null;
+        BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder()
+                .build();
+        bigquery = bigQueryOptions.getService();
+
+        DatasetInfo datasetInfo = 
DatasetInfo.newBuilder(RemoteBigQueryHelper.generateDatasetName()).build();
+        dataset = bigquery.create(datasetInfo);
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        bigquery.delete(dataset.getDatasetId(), 
BigQuery.DatasetDeleteOption.deleteContents());
+    }
+
+    protected static void validateNoServiceExceptionAttribute(FlowFile 
flowFile) {
+        
assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_MSG_ATTR));
+        
assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_REASON_ATTR));
+        
assertNull(flowFile.getAttribute(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR));
+    }
+
+    protected TestRunner setCredentialsControllerService(TestRunner runner) 
throws InitializationException {
+        final Map<String, String> propertiesMap = new HashMap<>();
+        final GCPCredentialsControllerService credentialsControllerService = 
new GCPCredentialsControllerService();
+
+        runner.addControllerService(CONTROLLER_SERVICE, 
credentialsControllerService, propertiesMap);
+        runner.enableControllerService(credentialsControllerService);
+        runner.assertValid(credentialsControllerService);
+
+        return runner;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
new file mode 100644
index 0000000..8686213
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.bigquery.FormatOptions;
+import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class PutBigQueryBatchIT extends AbstractBigQueryIT {
+
+    private static final String TABLE_SCHEMA_STRING = "[\n" +
+            "  {\n" +
+            "    \"description\": \"field 1\",\n" +
+            "    \"mode\": \"REQUIRED\",\n" +
+            "    \"name\": \"field_1\",\n" +
+            "    \"type\": \"STRING\"\n" +
+            "  },\n" +
+            "  {\n" +
+            "    \"description\": \"field 2\",\n" +
+            "    \"mode\": \"REQUIRED\",\n" +
+            "    \"name\": \"field_2\",\n" +
+            "    \"type\": \"STRING\"\n" +
+            "  },\n" +
+            "  {\n" +
+            "    \"description\": \"field 3\",\n" +
+            "    \"mode\": \"NULLABLE\",\n" +
+            "    \"name\": \"field_3\",\n" +
+            "    \"type\": \"STRING\"\n" +
+            "  }\n" +
+            "]";
+
+    @Before
+    public void setup() {
+        runner = TestRunners.newTestRunner(PutBigQueryBatch.class);
+    }
+
+    @Test
+    public void PutBigQueryBatchSmallPayloadTest() throws Exception {
+        String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+        runner = setCredentialsControllerService(runner);
+        
runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, 
CONTROLLER_SERVICE);
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, 
dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
+        runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, 
FormatOptions.json().getType());
+        runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, 
TABLE_SCHEMA_STRING);
+
+        String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Daniel is 
great\"}\r\n";
+
+        runner.enqueue(new 
ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
+        runner.run(1);
+        for (MockFlowFile flowFile : 
runner.getFlowFilesForRelationship(AbstractBigQueryProcessor.REL_SUCCESS)) {
+            validateNoServiceExceptionAttribute(flowFile);
+        }
+        
runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void PutBigQueryBatchBadRecordTest() throws Exception {
+        String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+        runner = setCredentialsControllerService(runner);
+        
runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, 
CONTROLLER_SERVICE);
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, 
dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
+        runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, 
FormatOptions.json().getType());
+        runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, 
TABLE_SCHEMA_STRING);
+
+        String str = "{\"field_1\":\"Daniel is great\"}\r\n";
+
+        runner.enqueue(new 
ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
+        runner.run(1);
+        
runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void PutBigQueryBatchLargePayloadTest() throws 
InitializationException, IOException {
+        String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+        runner = setCredentialsControllerService(runner);
+        
runner.setProperty(AbstractGCPProcessor.GCP_CREDENTIALS_PROVIDER_SERVICE, 
CONTROLLER_SERVICE);
+        runner.setProperty(BigQueryAttributes.DATASET_ATTR, 
dataset.getDatasetId().getDataset());
+        runner.setProperty(BigQueryAttributes.TABLE_NAME_ATTR, methodName);
+        runner.setProperty(BigQueryAttributes.SOURCE_TYPE_ATTR, 
FormatOptions.json().getType());
+        runner.setProperty(BigQueryAttributes.TABLE_SCHEMA_ATTR, 
TABLE_SCHEMA_STRING);
+
+        // Allow one bad record to deal with the extra line break.
+        runner.setProperty(BigQueryAttributes.MAX_BADRECORDS_ATTR, 
String.valueOf(1));
+
+        String str = "{\"field_1\":\"Daniel is great\",\"field_2\":\"Here's to 
the crazy ones. The misfits. The rebels. The troublemakers." +
+                " The round pegs in the square holes. The ones who see things 
differently. They're not fond of rules. And they have no respect" +
+                " for the status quo. You can quote them, disagree with them, 
glorify or vilify them. About the only thing you can't do is ignore" +
+                " them. Because they change things. They push the human race 
forward. And while some may see them as the crazy ones, we see genius." +
+                " Because the people who are crazy enough to think they can 
change the world, are the ones who do.\"}\n";
+        Path tempFile = Files.createTempFile(methodName, "");
+        try (BufferedWriter writer = Files.newBufferedWriter(tempFile)) {
+
+            for (int i = 0; i < 2; i++) {
+                for (int ii = 0; ii < 1_000_000; ii++) {
+                    writer.write(str);
+                }
+                writer.flush();
+            }
+            writer.flush();
+        }
+
+        runner.enqueue(tempFile);
+        runner.run(1);
+        for (MockFlowFile flowFile : 
runner.getFlowFilesForRelationship(AbstractBigQueryProcessor.REL_SUCCESS)) {
+            validateNoServiceExceptionAttribute(flowFile);
+        }
+        
runner.assertAllFlowFilesTransferred(AbstractBigQueryProcessor.REL_SUCCESS, 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
new file mode 100644
index 0000000..7ec5aa9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.cloud.RetryOption;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.JobStatistics;
+import com.google.cloud.bigquery.JobStatus;
+import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.TableDataWriteChannel;
+import com.google.cloud.bigquery.WriteChannelConfiguration;
+import org.apache.nifi.util.TestRunner;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link PutBigQueryBatch}.
+ */
+public class PutBigQueryBatchTest extends AbstractBQTest {
+    private static final String TABLENAME = "test_table";
+    private static final String TABLE_SCHEMA = "[{ \"mode\": \"NULLABLE\", 
\"name\": \"data\", \"type\": \"STRING\" }]";
+    private static final String SOURCE_TYPE = FormatOptions.json().getType();
+    private static final String CREATE_DISPOSITION = 
JobInfo.CreateDisposition.CREATE_IF_NEEDED.name();
+    private static final String WRITE_DISPOSITION = 
JobInfo.WriteDisposition.WRITE_EMPTY.name();
+    private static final String MAXBAD_RECORDS = "0";
+    private static final String IGNORE_UNKNOWN = "true";
+    private static final String READ_TIMEOUT = "5 minutes";
+
+    @Mock
+    BigQuery bq;
+
+    @Mock
+    Table table;
+
+    @Mock
+    Job job;
+
+    @Mock
+    JobStatus jobStatus;
+
+    @Mock
+    JobStatistics stats;
+
+    @Mock
+    TableDataWriteChannel tableDataWriteChannel;
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        reset(bq);
+        reset(table);
+        reset(job);
+        reset(jobStatus);
+        reset(stats);
+    }
+
+    @Override
+    public AbstractBigQueryProcessor getProcessor() {
+        return new PutBigQueryBatch() {
+            @Override
+            protected BigQuery getCloudService() {
+                return bq;
+            }
+        };
+    }
+
+    @Override
+    protected void addRequiredPropertiesToRunner(TestRunner runner) {
+        runner.setProperty(PutBigQueryBatch.DATASET, DATASET);
+        runner.setProperty(PutBigQueryBatch.TABLE_NAME, TABLENAME);
+        runner.setProperty(PutBigQueryBatch.TABLE_SCHEMA, TABLE_SCHEMA);
+        runner.setProperty(PutBigQueryBatch.SOURCE_TYPE, SOURCE_TYPE);
+        runner.setProperty(PutBigQueryBatch.CREATE_DISPOSITION, 
CREATE_DISPOSITION);
+        runner.setProperty(PutBigQueryBatch.WRITE_DISPOSITION, 
WRITE_DISPOSITION);
+        runner.setProperty(PutBigQueryBatch.MAXBAD_RECORDS, MAXBAD_RECORDS);
+        runner.setProperty(PutBigQueryBatch.IGNORE_UNKNOWN, IGNORE_UNKNOWN);
+        runner.setProperty(PutBigQueryBatch.READ_TIMEOUT, READ_TIMEOUT);
+    }
+
+    @Test
+    public void testSuccessfulLoad() throws Exception {
+        when(table.exists()).thenReturn(Boolean.TRUE);
+        when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job);
+        
when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
+        when(tableDataWriteChannel.getJob()).thenReturn(job);
+        
when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenReturn(job);
+        when(job.getStatus()).thenReturn(jobStatus);
+        when(job.getStatistics()).thenReturn(stats);
+
+        when(stats.getCreationTime()).thenReturn(0L);
+        when(stats.getStartTime()).thenReturn(1L);
+        when(stats.getEndTime()).thenReturn(2L);
+
+        final TestRunner runner = buildNewRunner(getProcessor());
+        addRequiredPropertiesToRunner(runner);
+        runner.assertValid();
+
+        runner.enqueue("{ \"data\": \"datavalue\" }");
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_SUCCESS);
+    }
+
+
+    @Test
+    public void testFailedLoad() throws Exception {
+        when(table.exists()).thenReturn(Boolean.TRUE);
+        when(bq.create(ArgumentMatchers.isA(JobInfo.class))).thenReturn(job);
+        
when(bq.writer(ArgumentMatchers.isA(WriteChannelConfiguration.class))).thenReturn(tableDataWriteChannel);
+        when(tableDataWriteChannel.getJob()).thenReturn(job);
+        
when(job.waitFor(ArgumentMatchers.isA(RetryOption.class))).thenThrow(BigQueryException.class);
+        when(job.getStatus()).thenReturn(jobStatus);
+        when(job.getStatistics()).thenReturn(stats);
+
+        when(stats.getCreationTime()).thenReturn(0L);
+        when(stats.getStartTime()).thenReturn(1L);
+        when(stats.getEndTime()).thenReturn(2L);
+
+        final TestRunner runner = buildNewRunner(getProcessor());
+        addRequiredPropertiesToRunner(runner);
+        runner.assertValid();
+
+        runner.enqueue("{ \"data\": \"datavalue\" }");
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_FAILURE);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index eca81bb..e17cf4b 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.gcp.storage;
 
-
 import com.google.api.gax.paging.Page;
 import com.google.cloud.storage.Acl;
 import com.google.cloud.storage.Blob;

http://git-wip-us.apache.org/repos/asf/nifi/blob/444caf8a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
index b8bd9f7..658350e 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
@@ -33,6 +33,7 @@
         <dependency>
             <groupId>com.google.auth</groupId>
             <artifactId>google-auth-library-oauth2-http</artifactId>
+            <version>0.9.0</version>
             <exclusions>
                 <exclusion>
                     <groupId>com.google.code.findbugs</groupId>

Reply via email to