NIFI-4731 This closes #3019.  This closes #2682.  This closes #2420.
NIFI-4933 BigQuery PR Review

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/03ef6465
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/03ef6465
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/03ef6465

Branch: refs/heads/master
Commit: 03ef6465478eea09704a2b6eb9be16f5001007b9
Parents: 444caf8
Author: Pierre Villard <[email protected]>
Authored: Tue Sep 18 23:19:03 2018 +0200
Committer: joewitt <[email protected]>
Committed: Wed Nov 21 10:28:40 2018 -0500

----------------------------------------------------------------------
 .../processors/gcp/AbstractGCPProcessor.java    |  47 ++-
 .../gcp/bigquery/AbstractBigQueryProcessor.java |  62 +++-
 .../gcp/bigquery/BigQueryAttributes.java        | 101 ++++--
 .../processors/gcp/bigquery/BigQueryUtils.java  |  84 +++++
 .../nifi/processors/gcp/bigquery/BqUtils.java   |  84 -----
 .../gcp/bigquery/PutBigQueryBatch.java          | 308 ++++++++++++-------
 .../gcp/pubsub/AbstractGCPubSubProcessor.java   |  21 ++
 .../gcp/storage/AbstractGCSProcessor.java       |  27 +-
 .../nifi-gcp-services-api/pom.xml               |   2 +-
 nifi-nar-bundles/nifi-gcp-bundle/pom.xml        |   5 +-
 10 files changed, 479 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
index e95bf73..0c360d1 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
@@ -19,16 +19,20 @@ package org.apache.nifi.processors.gcp;
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.cloud.Service;
 import com.google.cloud.ServiceOptions;
+import com.google.cloud.TransportOptions;
+import com.google.cloud.http.HttpTransportOptions;
 import com.google.common.collect.ImmutableList;
+
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
 import org.apache.nifi.proxy.ProxyConfiguration;
 
+import java.net.Proxy;
 import java.util.List;
 
 /**
@@ -65,7 +69,7 @@ public abstract class AbstractGCPProcessor<
                     "-Djdk.http.auth.tunneling.disabledSchemes=\n" +
                     "-Djdk.http.auth.proxying.disabledSchemes=")
             .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -74,14 +78,14 @@ public abstract class AbstractGCPProcessor<
             .displayName("Proxy port")
             .description("Proxy port number")
             .required(false)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .addValidator(StandardValidators.INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
             .build();
 
     public static final PropertyDescriptor HTTP_PROXY_USERNAME = new 
PropertyDescriptor
             .Builder().name("gcp-proxy-user-name")
-            .displayName("Http Proxy Username")
-            .description("Http Proxy Username")
+            .displayName("HTTP Proxy Username")
+            .description("HTTP Proxy Username")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .required(false)
@@ -89,8 +93,8 @@ public abstract class AbstractGCPProcessor<
 
     public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new 
PropertyDescriptor
             .Builder().name("gcp-proxy-user-password")
-            .displayName("Http Proxy Password")
-            .description("Http Proxy Password")
+            .displayName("HTTP Proxy Password")
+            .description("HTTP Proxy Password")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .required(false)
@@ -160,4 +164,31 @@ public abstract class AbstractGCPProcessor<
      * @see <a 
href="http://googlecloudplatform.github.io/google-cloud-java/0.8.0/apidocs/com/google/cloud/ServiceOptions.html";>ServiceOptions</a>
      */
     protected abstract CloudServiceOptions getServiceOptions(ProcessContext 
context, GoogleCredentials credentials);
+
+    /**
+     * Builds the Transport Options containing the proxy configuration
+     * @param context Context to get properties
+     * @return Transport options object with proxy configuration
+     */
+    protected TransportOptions getTransportOptions(ProcessContext context) {
+        final ProxyConfiguration proxyConfiguration = 
ProxyConfiguration.getConfiguration(context, () -> {
+            final String proxyHost = 
context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
+            final Integer proxyPort = 
context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
+            if (proxyHost != null && proxyPort != null && proxyPort > 0) {
+                final ProxyConfiguration componentProxyConfig = new 
ProxyConfiguration();
+                final String proxyUser = 
context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
+                final String proxyPassword = 
context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
+                componentProxyConfig.setProxyType(Proxy.Type.HTTP);
+                componentProxyConfig.setProxyServerHost(proxyHost);
+                componentProxyConfig.setProxyServerPort(proxyPort);
+                componentProxyConfig.setProxyUserName(proxyUser);
+                componentProxyConfig.setProxyUserPassword(proxyPassword);
+                return componentProxyConfig;
+            }
+            return ProxyConfiguration.DIRECT_CONFIGURATION;
+        });
+
+        final ProxyAwareTransportFactory transportFactory = new 
ProxyAwareTransportFactory(proxyConfiguration);
+        return 
HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
index b52a552..c249e7e 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
@@ -22,15 +22,21 @@ import com.google.auth.oauth2.GoogleCredentials;
 import com.google.cloud.bigquery.BigQuery;
 import com.google.cloud.bigquery.BigQueryOptions;
 import com.google.common.collect.ImmutableList;
+
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
 import org.apache.nifi.util.StringUtils;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -40,7 +46,9 @@ import java.util.Set;
  * Base class for creating processors that connect to GCP BiqQuery service
  */
 public abstract class AbstractBigQueryProcessor extends 
AbstractGCPProcessor<BigQuery, BigQueryOptions> {
+
     static final int BUFFER_SIZE = 65536;
+
     public static final Relationship REL_SUCCESS =
             new Relationship.Builder().name("success")
                     .description("FlowFiles are routed to this relationship 
after a successful Google BigQuery operation.")
@@ -53,8 +61,8 @@ public abstract class AbstractBigQueryProcessor extends 
AbstractGCPProcessor<Big
     public static final Set<Relationship> relationships = 
Collections.unmodifiableSet(
             new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
 
-    public static final PropertyDescriptor DATASET = new PropertyDescriptor
-            .Builder().name(BigQueryAttributes.DATASET_ATTR)
+    public static final PropertyDescriptor DATASET = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.DATASET_ATTR)
             .displayName("Dataset")
             .description(BigQueryAttributes.DATASET_DESC)
             .required(true)
@@ -63,8 +71,8 @@ public abstract class AbstractBigQueryProcessor extends 
AbstractGCPProcessor<Big
             .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor
-            .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR)
+    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.TABLE_NAME_ATTR)
             .displayName("Table Name")
             .description(BigQueryAttributes.TABLE_NAME_DESC)
             .required(true)
@@ -73,22 +81,22 @@ public abstract class AbstractBigQueryProcessor extends 
AbstractGCPProcessor<Big
             .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor TABLE_SCHEMA = new 
PropertyDescriptor
-            .Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
+    public static final PropertyDescriptor TABLE_SCHEMA = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
             .displayName("Table Schema")
             .description(BigQueryAttributes.TABLE_SCHEMA_DESC)
             .required(false)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    public static final PropertyDescriptor READ_TIMEOUT = new 
PropertyDescriptor
-            .Builder().name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
+    public static final PropertyDescriptor READ_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR)
             .displayName("Read Timeout")
             .description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC)
             .required(true)
             .defaultValue("5 minutes")
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
@@ -101,6 +109,10 @@ public abstract class AbstractBigQueryProcessor extends 
AbstractGCPProcessor<Big
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return ImmutableList.<PropertyDescriptor>builder()
                 .addAll(super.getSupportedPropertyDescriptors())
+                .add(DATASET)
+                .add(TABLE_NAME)
+                .add(TABLE_SCHEMA)
+                .add(READ_TIMEOUT)
                 .build();
     }
 
@@ -109,14 +121,40 @@ public abstract class AbstractBigQueryProcessor extends 
AbstractGCPProcessor<Big
         final String projectId = 
context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
         final Integer retryCount = 
Integer.valueOf(context.getProperty(RETRY_COUNT).getValue());
 
-        BigQueryOptions.Builder builder = 
BigQueryOptions.newBuilder().setCredentials(credentials);
+        final BigQueryOptions.Builder builder = BigQueryOptions.newBuilder();
 
         if (!StringUtils.isBlank(projectId)) {
             builder.setProjectId(projectId);
         }
 
-        return builder
+        return builder.setCredentials(credentials)
                 
.setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(retryCount).build())
+                .setTransportOptions(getTransportOptions(context))
                 .build();
     }
+
+    @Override
+    protected final Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
+        final Collection<ValidationResult> results = 
super.customValidate(validationContext);
+        ProxyConfiguration.validateProxySpec(validationContext, results, 
ProxyAwareTransportFactory.PROXY_SPECS);
+
+        final boolean projectId = 
validationContext.getProperty(PROJECT_ID).isSet();
+        if (!projectId) {
+            results.add(new ValidationResult.Builder()
+                    .subject(PROJECT_ID.getName())
+                    .valid(false)
+                    .explanation("The Project ID must be set for this 
processor.")
+                    .build());
+        }
+
+        customValidate(validationContext, results);
+        return results;
+    }
+
+    /**
+     * If sub-classes needs to implement any custom validation, override this 
method then add validation result to the results.
+     */
+    protected void customValidate(ValidationContext validationContext, 
Collection<ValidationResult> results) {
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
index 380f701..842a176 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryAttributes.java
@@ -17,9 +17,12 @@
 
 package org.apache.nifi.processors.gcp.bigquery;
 
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import 
org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
 
+import com.google.cloud.bigquery.JobInfo;
+
 /**
  * Attributes associated with the BigQuery processors
  */
@@ -28,44 +31,75 @@ public class BigQueryAttributes {
 
     public static final PropertyDescriptor SERVICE_ACCOUNT_JSON_FILE = 
CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
 
+    // Properties
+    public static final String SOURCE_TYPE_ATTR = "bq.load.type";
+    public static final String SOURCE_TYPE_DESC = "Data type of the file to be 
loaded. Possible values: AVRO, "
+            + "NEWLINE_DELIMITED_JSON, CSV.";
+
+    public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
+    public static final String IGNORE_UNKNOWN_DESC = "Sets whether BigQuery 
should allow extra values that are not represented "
+            + "in the table schema. If true, the extra values are ignored. If 
false, records with extra columns are treated as "
+            + "bad records, and if there are too many bad records, an invalid 
error is returned in the job result. By default "
+            + "unknown values are not allowed.";
+
+    public static final String WRITE_DISPOSITION_ATTR = 
"bq.load.write_disposition";
+    public static final String WRITE_DISPOSITION_DESC = "Sets the action that 
should occur if the destination table already exists.";
+
+    public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
+    public static final String MAX_BADRECORDS_DESC = "Sets the maximum number 
of bad records that BigQuery can ignore when running "
+            + "the job. If the number of bad records exceeds this value, an 
invalid error is returned in the job result. By default "
+            + "no bad record is ignored.";
+
     public static final String DATASET_ATTR = "bq.dataset";
-    public static final String DATASET_DESC = "BigQuery dataset";
+    public static final String DATASET_DESC = "BigQuery dataset name (Note - 
The dataset must exist in GCP)";
 
     public static final String TABLE_NAME_ATTR = "bq.table.name";
     public static final String TABLE_NAME_DESC = "BigQuery table name";
 
     public static final String TABLE_SCHEMA_ATTR = "bq.table.schema";
-    public static final String TABLE_SCHEMA_DESC = "BigQuery table name";
+    public static final String TABLE_SCHEMA_DESC = "BigQuery schema in JSON 
format";
 
     public static final String CREATE_DISPOSITION_ATTR = 
"bq.load.create_disposition";
-    public static final String CREATE_DISPOSITION_DESC = "Options for table 
creation";
+    public static final String CREATE_DISPOSITION_DESC = "Sets whether the job 
is allowed to create new tables";
 
-    public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
-    public static final String JOB_ERROR_MSG_DESC = "Load job error message";
+    public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
+    public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
 
-    public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
-    public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
+    public static final String CSV_ALLOW_JAGGED_ROWS_ATTR = 
"bq.csv.allow.jagged.rows";
+    public static final String CSV_ALLOW_JAGGED_ROWS_DESC = "Set whether 
BigQuery should accept rows that are missing "
+            + "trailing optional columns. If true, BigQuery treats missing 
trailing columns as null values. If false, "
+            + "records with missing trailing columns are treated as bad 
records, and if there are too many bad records, "
+            + "an invalid error is returned in the job result. By default, 
rows with missing trailing columns are "
+            + "considered bad records.";
 
-    public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
-    public static final String JOB_ERROR_LOCATION_DESC = "Load job error 
location";
+    public static final String CSV_ALLOW_QUOTED_NEW_LINES_ATTR = 
"bq.csv.allow.quoted.new.lines";
+    public static final String CSV_ALLOW_QUOTED_NEW_LINES_DESC = "Sets whether 
BigQuery should allow quoted data sections "
+            + "that contain newline characters in a CSV file. By default 
quoted newline are not allowed.";
 
-    public static final String JOB_READ_TIMEOUT_ATTR = "bq.readtimeout";
-    public static final String JOB_READ_TIMEOUT_DESC = "Load Job Time Out";
+    public static final String CSV_CHARSET_ATTR = "bq.csv.charset";
+    public static final String CSV_CHARSET_DESC = "Sets the character encoding 
of the data.";
 
+    public static final String CSV_FIELD_DELIMITER_ATTR = "bq.csv.delimiter";
+    public static final String CSV_FIELD_DELIMITER_DESC = "Sets the separator 
for fields in a CSV file. BigQuery converts "
+            + "the string to ISO-8859-1 encoding, and then uses the first byte 
of the encoded string to split the data in its "
+            + "raw, binary state. BigQuery also supports the escape sequence 
\"\t\" to specify a tab separator. The default "
+            + "value is a comma (',').";
 
-    // Batch Attributes
-    public static final String SOURCE_TYPE_ATTR = "bq.load.type";
-    public static final String SOURCE_TYPE_DESC = "Data type of the file to be 
loaded";
+    public static final String CSV_QUOTE_ATTR = "bq.csv.quote";
+    public static final String CSV_QUOTE_DESC = "Sets the value that is used 
to quote data sections in a CSV file. BigQuery "
+            + "converts the string to ISO-8859-1 encoding, and then uses the 
first byte of the encoded string to split the "
+            + "data in its raw, binary state. The default value is a 
double-quote ('\"'). If your data does not contain quoted "
+            + "sections, set the property value to an empty string. If your 
data contains quoted newline characters, you must "
+            + "also set the Allow Quoted New Lines property to true.";
 
-    public static final String IGNORE_UNKNOWN_ATTR = "bq.load.ignore_unknown";
-    public static final String IGNORE_UNKNOWN_DESC = "Ignore fields not in 
table schema";
+    public static final String CSV_SKIP_LEADING_ROWS_ATTR = 
"bq.csv.skip.leading.rows";
+    public static final String CSV_SKIP_LEADING_ROWS_DESC = "Sets the number 
of rows at the top of a CSV file that BigQuery "
+            + "will skip when reading the data. The default value is 0. This 
property is useful if you have header rows in the "
+            + "file that should be skipped.";
 
-    public static final String WRITE_DISPOSITION_ATTR = 
"bq.load.write_disposition";
-    public static final String WRITE_DISPOSITION_DESC = "Options for writing 
to table";
 
-    public static final String MAX_BADRECORDS_ATTR = "bq.load.max_badrecords";
-    public static final String MAX_BADRECORDS_DESC = "Number of erroneous 
records to ignore before generating an error";
 
+    // Batch Attributes
     public static final String JOB_CREATE_TIME_ATTR = 
"bq.job.stat.creation_time";
     public static final String JOB_CREATE_TIME_DESC = "Time load job creation";
 
@@ -77,4 +111,31 @@ public class BigQueryAttributes {
 
     public static final String JOB_LINK_ATTR = "bq.job.link";
     public static final String JOB_LINK_DESC = "API Link to load job";
+
+    public static final String JOB_NB_RECORDS_ATTR = "bq.records.count";
+    public static final String JOB_NB_RECORDS_DESC = "Number of records 
successfully inserted";
+
+    public static final String JOB_ERROR_MSG_ATTR = "bq.error.message";
+    public static final String JOB_ERROR_MSG_DESC = "Load job error message";
+
+    public static final String JOB_ERROR_REASON_ATTR = "bq.error.reason";
+    public static final String JOB_ERROR_REASON_DESC = "Load job error reason";
+
+    public static final String JOB_ERROR_LOCATION_ATTR = "bq.error.location";
+    public static final String JOB_ERROR_LOCATION_DESC = "Load job error 
location";
+
+
+    // Allowable values
+    public static final AllowableValue CREATE_IF_NEEDED = new 
AllowableValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(),
+            JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), "Configures the 
job to create the table if it does not exist.");
+    public static final AllowableValue CREATE_NEVER = new 
AllowableValue(JobInfo.CreateDisposition.CREATE_NEVER.name(),
+            JobInfo.CreateDisposition.CREATE_NEVER.name(), "Configures the job 
to fail with a not-found error if the table does not exist.");
+
+    public static final AllowableValue WRITE_EMPTY = new 
AllowableValue(JobInfo.WriteDisposition.WRITE_EMPTY.name(),
+            JobInfo.WriteDisposition.WRITE_EMPTY.name(), "Configures the job 
to fail with a duplicate error if the table already exists.");
+    public static final AllowableValue WRITE_APPEND = new 
AllowableValue(JobInfo.WriteDisposition.WRITE_APPEND.name(),
+            JobInfo.WriteDisposition.WRITE_APPEND.name(), "Configures the job 
to append data to the table if it already exists.");
+    public static final AllowableValue WRITE_TRUNCATE = new 
AllowableValue(JobInfo.WriteDisposition.WRITE_TRUNCATE.name(),
+            JobInfo.WriteDisposition.WRITE_TRUNCATE.name(), "Configures the 
job to overwrite the table data if table already exists.");
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
new file mode 100644
index 0000000..3139ffd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BigQueryUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.LegacySQLTypeName;
+import com.google.cloud.bigquery.Schema;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+
+/**
+ * Util class for schema manipulation
+ */
+public class BigQueryUtils {
+
+    private final static Type gsonSchemaType = new TypeToken<List<Map>>() { 
}.getType();
+
+    public static Field mapToField(Map fMap) {
+        String typeStr = fMap.get("type").toString();
+        String nameStr = fMap.get("name").toString();
+        String modeStr = fMap.get("mode").toString();
+        LegacySQLTypeName type = null;
+
+        if (typeStr.equals("BOOLEAN")) {
+            type = LegacySQLTypeName.BOOLEAN;
+        } else if (typeStr.equals("STRING")) {
+            type = LegacySQLTypeName.STRING;
+        } else if (typeStr.equals("BYTES")) {
+            type = LegacySQLTypeName.BYTES;
+        } else if (typeStr.equals("INTEGER")) {
+            type = LegacySQLTypeName.INTEGER;
+        } else if (typeStr.equals("FLOAT")) {
+            type = LegacySQLTypeName.FLOAT;
+        } else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE")
+                || typeStr.equals("TIME") || typeStr.equals("DATETIME")) {
+            type = LegacySQLTypeName.TIMESTAMP;
+        } else if (typeStr.equals("RECORD")) {
+            type = LegacySQLTypeName.RECORD;
+        }
+
+        return Field.newBuilder(nameStr, 
type).setMode(Field.Mode.valueOf(modeStr)).build();
+    }
+
+    public static List<Field> listToFields(List<Map> m_fields) {
+        List<Field> fields = new ArrayList(m_fields.size());
+        for (Map m : m_fields) {
+            fields.add(mapToField(m));
+        }
+
+        return fields;
+    }
+
+    public static Schema schemaFromString(String schemaStr) {
+        if (schemaStr == null) {
+            return null;
+        } else {
+            Gson gson = new Gson();
+            List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
+            return Schema.of(BigQueryUtils.listToFields(fields));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
deleted file mode 100644
index f7f5d66..0000000
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/BqUtils.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.gcp.bigquery;
-
-import com.google.cloud.bigquery.Field;
-import com.google.cloud.bigquery.LegacySQLTypeName;
-import com.google.cloud.bigquery.Schema;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- *
- */
-public class BqUtils {
-    private final static Type gsonSchemaType = new TypeToken<List<Map>>() {
-    }.getType();
-
-    public static Field mapToField(Map fMap) {
-        String typeStr = fMap.get("type").toString();
-        String nameStr = fMap.get("name").toString();
-        String modeStr = fMap.get("mode").toString();
-        LegacySQLTypeName type = null;
-
-        if (typeStr.equals("BOOLEAN")) {
-            type = LegacySQLTypeName.BOOLEAN;
-        } else if (typeStr.equals("STRING")) {
-            type = LegacySQLTypeName.STRING;
-        } else if (typeStr.equals("BYTES")) {
-            type = LegacySQLTypeName.BYTES;
-        } else if (typeStr.equals("INTEGER")) {
-            type = LegacySQLTypeName.INTEGER;
-        } else if (typeStr.equals("FLOAT")) {
-            type = LegacySQLTypeName.FLOAT;
-        } else if (typeStr.equals("TIMESTAMP") || typeStr.equals("DATE")
-                || typeStr.equals("TIME") || typeStr.equals("DATETIME")) {
-            type = LegacySQLTypeName.TIMESTAMP;
-        } else if (typeStr.equals("RECORD")) {
-            type = LegacySQLTypeName.RECORD;
-        }
-
-        return Field.newBuilder(nameStr, 
type).setMode(Field.Mode.valueOf(modeStr)).build();
-    }
-
-    public static List<Field> listToFields(List<Map> m_fields) {
-        List<Field> fields = new ArrayList(m_fields.size());
-        for (Map m : m_fields) {
-            fields.add(mapToField(m));
-        }
-
-        return fields;
-    }
-
-    public static Schema schemaFromString(String schemaStr) {
-        if (schemaStr == null) {
-            return null;
-        } else {
-            Gson gson = new Gson();
-            List<Map> fields = gson.fromJson(schemaStr, gsonSchemaType);
-            return Schema.of(BqUtils.listToFields(fields));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
index 99c7f2a..5068ab5 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
@@ -18,15 +18,16 @@
 package org.apache.nifi.processors.gcp.bigquery;
 
 import com.google.cloud.RetryOption;
-import com.google.cloud.bigquery.BigQuery;
 import com.google.cloud.bigquery.FormatOptions;
 import com.google.cloud.bigquery.Job;
 import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
 import com.google.cloud.bigquery.Schema;
 import com.google.cloud.bigquery.TableDataWriteChannel;
 import com.google.cloud.bigquery.TableId;
 import com.google.cloud.bigquery.WriteChannelConfiguration;
 import com.google.common.collect.ImmutableList;
+
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -35,7 +36,10 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.processor.ProcessContext;
@@ -51,6 +55,7 @@ import org.threeten.bp.temporal.ChronoUnit;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -59,40 +64,58 @@ import java.util.concurrent.TimeUnit;
 /**
  * A processor for batch loading data into a Google BigQuery table
  */
-
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @Tags({"google", "google cloud", "bq", "bigquery"})
-@CapabilityDescription("Batch loads flow files to a Google BigQuery table.")
+@CapabilityDescription("Batch loads flow files content to a Google BigQuery 
table.")
 @SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
-
 @WritesAttributes({
-        @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, 
description = BigQueryAttributes.DATASET_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, 
description = BigQueryAttributes.TABLE_NAME_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, 
description = BigQueryAttributes.TABLE_SCHEMA_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, 
description = BigQueryAttributes.SOURCE_TYPE_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, 
description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
-        @WritesAttribute(attribute = 
BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = 
BigQueryAttributes.CREATE_DISPOSITION_DESC),
-        @WritesAttribute(attribute = 
BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = 
BigQueryAttributes.WRITE_DISPOSITION_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, 
description = BigQueryAttributes.MAX_BADRECORDS_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, 
description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, 
description = BigQueryAttributes.JOB_END_TIME_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, 
description = BigQueryAttributes.JOB_START_TIME_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, 
description = BigQueryAttributes.JOB_LINK_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, 
description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
-        @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, 
description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
-        @WritesAttribute(attribute = 
BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = 
BigQueryAttributes.JOB_ERROR_LOCATION_DESC)
+    @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description 
= BigQueryAttributes.DATASET_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, 
description = BigQueryAttributes.TABLE_NAME_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, 
description = BigQueryAttributes.TABLE_SCHEMA_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, 
description = BigQueryAttributes.SOURCE_TYPE_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, 
description = BigQueryAttributes.IGNORE_UNKNOWN_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, 
description = BigQueryAttributes.CREATE_DISPOSITION_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, 
description = BigQueryAttributes.WRITE_DISPOSITION_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, 
description = BigQueryAttributes.MAX_BADRECORDS_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, 
description = BigQueryAttributes.JOB_CREATE_TIME_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, 
description = BigQueryAttributes.JOB_END_TIME_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, 
description = BigQueryAttributes.JOB_START_TIME_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description 
= BigQueryAttributes.JOB_LINK_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, 
description = BigQueryAttributes.JOB_ERROR_MSG_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, 
description = BigQueryAttributes.JOB_ERROR_REASON_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, 
description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC),
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, 
description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
 })
-
 public class PutBigQueryBatch extends AbstractBigQueryProcessor {
 
+    private static final List<String> TYPES = 
Arrays.asList(FormatOptions.json().getType(), FormatOptions.csv().getType(), 
FormatOptions.avro().getType());
+
+    private static final Validator FORMAT_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
+            final ValidationResult.Builder builder = new 
ValidationResult.Builder();
+            builder.subject(subject).input(input);
+            if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(input)) {
+                return builder.valid(true).explanation("Contains Expression 
Language").build();
+            }
+
+            if(TYPES.contains(input.toUpperCase())) {
+                builder.valid(true);
+            } else {
+                builder.valid(false).explanation("Load File Type must be one 
of the following options: " + StringUtils.join(TYPES, ", "));
+            }
+
+            return builder.build();
+        }
+    };
+
     public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor
             .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR)
             .displayName("Load file type")
             .description(BigQueryAttributes.SOURCE_TYPE_DESC)
             .required(true)
-            .allowableValues(FormatOptions.json().getType(), 
FormatOptions.avro().getType(), FormatOptions.csv().getType())
-            .defaultValue(FormatOptions.avro().getType())
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(FORMAT_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
 
     public static final PropertyDescriptor IGNORE_UNKNOWN = new 
PropertyDescriptor.Builder()
@@ -102,7 +125,7 @@ public class PutBigQueryBatch extends 
AbstractBigQueryProcessor {
             .required(true)
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .allowableValues("true", "false")
-            .defaultValue("true")
+            .defaultValue("false")
             .build();
 
     public static final PropertyDescriptor CREATE_DISPOSITION = new 
PropertyDescriptor.Builder()
@@ -110,8 +133,8 @@ public class PutBigQueryBatch extends 
AbstractBigQueryProcessor {
             .displayName("Create Disposition")
             .description(BigQueryAttributes.CREATE_DISPOSITION_DESC)
             .required(true)
-            
.allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), 
JobInfo.CreateDisposition.CREATE_NEVER.name())
-            .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name())
+            .allowableValues(BigQueryAttributes.CREATE_IF_NEEDED, 
BigQueryAttributes.CREATE_NEVER)
+            .defaultValue(BigQueryAttributes.CREATE_IF_NEEDED.getValue())
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -120,8 +143,8 @@ public class PutBigQueryBatch extends 
AbstractBigQueryProcessor {
             .displayName("Write Disposition")
             .description(BigQueryAttributes.WRITE_DISPOSITION_DESC)
             .required(true)
-            .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), 
JobInfo.WriteDisposition.WRITE_APPEND.name(), 
JobInfo.WriteDisposition.WRITE_TRUNCATE.name())
-            .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name())
+            .allowableValues(BigQueryAttributes.WRITE_EMPTY, 
BigQueryAttributes.WRITE_APPEND, BigQueryAttributes.WRITE_TRUNCATE)
+            .defaultValue(BigQueryAttributes.WRITE_EMPTY.getValue())
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -134,34 +157,78 @@ public class PutBigQueryBatch extends 
AbstractBigQueryProcessor {
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .build();
 
-    private Schema schemaCache = null;
+    public static final PropertyDescriptor CSV_ALLOW_JAGGED_ROWS = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_ATTR)
+            .displayName("CSV Input - Allow Jagged Rows")
+            .description(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_DESC)
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor CSV_ALLOW_QUOTED_NEW_LINES = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_ATTR)
+            .displayName("CSV Input - Allow Quoted New Lines")
+            .description(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_DESC)
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
 
-    public PutBigQueryBatch() {
+    public static final PropertyDescriptor CSV_CHARSET = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CSV_CHARSET_ATTR)
+            .displayName("CSV Input - Character Set")
+            .description(BigQueryAttributes.CSV_CHARSET_DESC)
+            .required(true)
+            .allowableValues("UTF-8", "ISO-8859-1")
+            .defaultValue("UTF-8")
+            .build();
 
-    }
+    public static final PropertyDescriptor CSV_FIELD_DELIMITER = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CSV_FIELD_DELIMITER_ATTR)
+            .displayName("CSV Input - Field Delimiter")
+            .description(BigQueryAttributes.CSV_FIELD_DELIMITER_DESC)
+            .required(true)
+            .defaultValue(",")
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor CSV_QUOTE = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CSV_QUOTE_ATTR)
+            .displayName("CSV Input - Quote")
+            .description(BigQueryAttributes.CSV_QUOTE_DESC)
+            .required(true)
+            .defaultValue("\"")
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor CSV_SKIP_LEADING_ROWS = new 
PropertyDescriptor.Builder()
+            .name(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_ATTR)
+            .displayName("CSV Input - Skip Leading Rows")
+            .description(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_DESC)
+            .required(true)
+            .defaultValue("0")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return ImmutableList.<PropertyDescriptor>builder()
                 .addAll(super.getSupportedPropertyDescriptors())
-                .add(DATASET)
-                .add(TABLE_NAME)
-                .add(TABLE_SCHEMA)
                 .add(SOURCE_TYPE)
                 .add(CREATE_DISPOSITION)
                 .add(WRITE_DISPOSITION)
                 .add(MAXBAD_RECORDS)
                 .add(IGNORE_UNKNOWN)
-                .build();
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .name(propertyDescriptorName)
-                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                .expressionLanguageSupported(true)
-                .dynamic(true)
+                .add(CSV_ALLOW_JAGGED_ROWS)
+                .add(CSV_ALLOW_QUOTED_NEW_LINES)
+                .add(CSV_CHARSET)
+                .add(CSV_FIELD_DELIMITER)
+                .add(CSV_QUOTE)
+                .add(CSV_SKIP_LEADING_ROWS)
                 .build();
     }
 
@@ -178,13 +245,10 @@ public class PutBigQueryBatch extends 
AbstractBigQueryProcessor {
             return;
         }
 
-        final Map<String, String> attributes = new HashMap<>();
-
-        final BigQuery bq = getCloudService();
-
         final String projectId = 
context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
         final String dataset = 
context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
         final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String type = 
context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue();
 
         final TableId tableId;
         if (StringUtils.isEmpty(projectId)) {
@@ -193,70 +257,93 @@ public class PutBigQueryBatch extends 
AbstractBigQueryProcessor {
             tableId = TableId.of(projectId, dataset, tableName);
         }
 
-        final String fileType = context.getProperty(SOURCE_TYPE).getValue();
+        try {
 
-        String schemaString = 
context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue();
-        Schema schema = BqUtils.schemaFromString(schemaString);
+            FormatOptions formatOption;
 
-        WriteChannelConfiguration writeChannelConfiguration =
-                WriteChannelConfiguration.newBuilder(tableId)
-                        
.setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
-                        
.setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
-                        
.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
-                        
.setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
-                        .setSchema(schema)
-                        .setFormatOptions(FormatOptions.of(fileType))
+            if(type.equals(FormatOptions.csv().getType())) {
+                formatOption = FormatOptions.csv().toBuilder()
+                        
.setAllowJaggedRows(context.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean())
+                        
.setAllowQuotedNewLines(context.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean())
+                        
.setEncoding(context.getProperty(CSV_CHARSET).getValue())
+                        
.setFieldDelimiter(context.getProperty(CSV_FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue())
+                        
.setQuote(context.getProperty(CSV_QUOTE).evaluateAttributeExpressions(flowFile).getValue())
+                        
.setSkipLeadingRows(context.getProperty(CSV_SKIP_LEADING_ROWS).evaluateAttributeExpressions(flowFile).asInteger())
                         .build();
+            } else {
+                formatOption = FormatOptions.of(type);
+            }
 
-        TableDataWriteChannel writer = bq.writer(writeChannelConfiguration);
-
-        try {
-            session.read(flowFile, rawIn -> {
-                ReadableByteChannel readableByteChannel = 
Channels.newChannel(rawIn);
-                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
-                while (readableByteChannel.read(byteBuffer) >= 0) {
-                    byteBuffer.flip();
-                    writer.write(byteBuffer);
-                    byteBuffer.clear();
-                }
-            });
-
-            writer.close();
-
-            Job job = writer.getJob();
-            PropertyValue property = context.getProperty(READ_TIMEOUT);
-            Long timePeriod = 
property.evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS);
-            Duration duration = Duration.of(timePeriod, ChronoUnit.SECONDS);
-            job = job.waitFor(RetryOption.totalTimeout(duration));
-
-            if (job != null) {
-                attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, 
Long.toString(job.getStatistics().getCreationTime()));
-                attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, 
Long.toString(job.getStatistics().getEndTime()));
-                attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, 
Long.toString(job.getStatistics().getStartTime()));
-                attributes.put(BigQueryAttributes.JOB_LINK_ATTR, 
job.getSelfLink());
-
-                boolean jobError = (job.getStatus().getError() != null);
-
-                if (jobError) {
-                    attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, 
job.getStatus().getError().getMessage());
-                    attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, 
job.getStatus().getError().getReason());
-                    attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, 
job.getStatus().getError().getLocation());
-                } else {
-                    // in case it got looped back from error
-                    flowFile = session.removeAttribute(flowFile, 
BigQueryAttributes.JOB_ERROR_MSG_ATTR);
-                    flowFile = session.removeAttribute(flowFile, 
BigQueryAttributes.JOB_ERROR_REASON_ATTR);
-                    flowFile = session.removeAttribute(flowFile, 
BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
-                }
-
-                if (!attributes.isEmpty()) {
-                    flowFile = session.putAllAttributes(flowFile, attributes);
-                }
-
-                if (jobError) {
-                    flowFile = session.penalize(flowFile);
-                    session.transfer(flowFile, REL_FAILURE);
-                } else {
-                    session.transfer(flowFile, REL_SUCCESS);
+            final Schema schema = 
BigQueryUtils.schemaFromString(context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue());
+            final WriteChannelConfiguration writeChannelConfiguration =
+                    WriteChannelConfiguration.newBuilder(tableId)
+                    
.setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue()))
+                    
.setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue()))
+                    
.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean())
+                    
.setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger())
+                    .setSchema(schema)
+                    .setFormatOptions(formatOption)
+                    .build();
+
+            try ( TableDataWriteChannel writer = 
getCloudService().writer(writeChannelConfiguration) ) {
+
+                session.read(flowFile, rawIn -> {
+                    ReadableByteChannel readableByteChannel = 
Channels.newChannel(rawIn);
+                    ByteBuffer byteBuffer = 
ByteBuffer.allocateDirect(BUFFER_SIZE);
+                    while (readableByteChannel.read(byteBuffer) >= 0) {
+                        byteBuffer.flip();
+                        writer.write(byteBuffer);
+                        byteBuffer.clear();
+                    }
+                });
+
+                // writer must be closed to get the job
+                writer.close();
+
+                Job job = writer.getJob();
+                Long timePeriod = 
context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.SECONDS);
+                Duration waitFor = Duration.of(timePeriod, ChronoUnit.SECONDS);
+                job = job.waitFor(RetryOption.totalTimeout(waitFor));
+
+                if (job != null) {
+                    final Map<String, String> attributes = new HashMap<>();
+
+                    attributes.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, 
Long.toString(job.getStatistics().getCreationTime()));
+                    attributes.put(BigQueryAttributes.JOB_END_TIME_ATTR, 
Long.toString(job.getStatistics().getEndTime()));
+                    attributes.put(BigQueryAttributes.JOB_START_TIME_ATTR, 
Long.toString(job.getStatistics().getStartTime()));
+                    attributes.put(BigQueryAttributes.JOB_LINK_ATTR, 
job.getSelfLink());
+
+                    boolean jobError = (job.getStatus().getError() != null);
+
+                    if (jobError) {
+                        attributes.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, 
job.getStatus().getError().getMessage());
+                        
attributes.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, 
job.getStatus().getError().getReason());
+                        
attributes.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, 
job.getStatus().getError().getLocation());
+                    } else {
+                        // in case it got looped back from error
+                        flowFile = session.removeAttribute(flowFile, 
BigQueryAttributes.JOB_ERROR_MSG_ATTR);
+                        flowFile = session.removeAttribute(flowFile, 
BigQueryAttributes.JOB_ERROR_REASON_ATTR);
+                        flowFile = session.removeAttribute(flowFile, 
BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
+
+                        // add the number of records successfully added
+                        if(job.getStatistics() instanceof LoadStatistics) {
+                            final LoadStatistics stats = (LoadStatistics) 
job.getStatistics();
+                            
attributes.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, 
Long.toString(stats.getOutputRows()));
+                        }
+                    }
+
+                    if (!attributes.isEmpty()) {
+                        flowFile = session.putAllAttributes(flowFile, 
attributes);
+                    }
+
+                    if (jobError) {
+                        getLogger().log(LogLevel.WARN, 
job.getStatus().getError().getMessage());
+                        flowFile = session.penalize(flowFile);
+                        session.transfer(flowFile, REL_FAILURE);
+                    } else {
+                        session.getProvenanceReporter().send(flowFile, 
job.getSelfLink(), job.getStatistics().getEndTime() - 
job.getStatistics().getStartTime());
+                        session.transfer(flowFile, REL_SUCCESS);
+                    }
                 }
             }
 
@@ -266,4 +353,5 @@ public class PutBigQueryBatch extends 
AbstractBigQueryProcessor {
             session.transfer(flowFile, REL_FAILURE);
         }
     }
-}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
index 31dc0a7..8930a27 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
@@ -18,13 +18,17 @@ package org.apache.nifi.processors.gcp.pubsub;
 
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.cloud.ServiceOptions;
+
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -62,4 +66,21 @@ public abstract class AbstractGCPubSubProcessor extends 
AbstractGCPProcessor {
     protected ServiceOptions getServiceOptions(ProcessContext context, 
GoogleCredentials credentials) {
         return null;
     }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final Collection<ValidationResult> results = 
super.customValidate(validationContext);
+
+        final boolean projectId = 
validationContext.getProperty(PROJECT_ID).isSet();
+        if (!projectId) {
+            results.add(new ValidationResult.Builder()
+                    .subject(PROJECT_ID.getName())
+                    .valid(false)
+                    .explanation("The Project ID must be set for this 
processor.")
+                    .build());
+        }
+
+        return results;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
index 7c63631..fba984e 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.processors.gcp.storage;
 
-import java.net.Proxy;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -36,7 +34,6 @@ import org.apache.nifi.proxy.ProxyConfiguration;
 
 import com.google.api.gax.retrying.RetrySettings;
 import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.http.HttpTransportOptions;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageOptions;
 import com.google.common.collect.ImmutableList;
@@ -73,7 +70,7 @@ public abstract class AbstractGCSProcessor extends 
AbstractGCPProcessor<Storage,
 
     @Override
     protected final Collection<ValidationResult> 
customValidate(ValidationContext validationContext) {
-        final Collection<ValidationResult> results = new ArrayList<>();
+        final Collection<ValidationResult> results = 
super.customValidate(validationContext);
         ProxyConfiguration.validateProxySpec(validationContext, results, 
ProxyAwareTransportFactory.PROXY_SPECS);
         customValidate(validationContext, results);
         return results;
@@ -96,30 +93,10 @@ public abstract class AbstractGCSProcessor extends 
AbstractGCPProcessor<Storage,
                         .setMaxAttempts(retryCount)
                         .build());
 
-        final ProxyConfiguration proxyConfiguration = 
ProxyConfiguration.getConfiguration(context, () -> {
-            final String proxyHost = 
context.getProperty(PROXY_HOST).getValue();
-            final Integer proxyPort = 
context.getProperty(PROXY_PORT).asInteger();
-            if (proxyHost != null && proxyPort != null && proxyPort > 0) {
-                final ProxyConfiguration componentProxyConfig = new 
ProxyConfiguration();
-                final String proxyUser = 
context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
-                final String proxyPassword = 
context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
-                componentProxyConfig.setProxyType(Proxy.Type.HTTP);
-                componentProxyConfig.setProxyServerHost(proxyHost);
-                componentProxyConfig.setProxyServerPort(proxyPort);
-                componentProxyConfig.setProxyUserName(proxyUser);
-                componentProxyConfig.setProxyUserPassword(proxyPassword);
-                return componentProxyConfig;
-            }
-            return ProxyConfiguration.DIRECT_CONFIGURATION;
-        });
-
         if (!projectId.isEmpty()) {
             storageOptionsBuilder.setProjectId(projectId);
         }
 
-        final ProxyAwareTransportFactory transportFactory = new 
ProxyAwareTransportFactory(proxyConfiguration);
-        
storageOptionsBuilder.setTransportOptions(HttpTransportOptions.newBuilder().setHttpTransportFactory(transportFactory).build());
-
-        return  storageOptionsBuilder.build();
+        return  
storageOptionsBuilder.setTransportOptions(getTransportOptions(context)).build();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
index 658350e..98aa1ee 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
@@ -33,7 +33,7 @@
         <dependency>
             <groupId>com.google.auth</groupId>
             <artifactId>google-auth-library-oauth2-http</artifactId>
-            <version>0.9.0</version>
+            <version>0.12.0</version>
             <exclusions>
                 <exclusion>
                     <groupId>com.google.code.findbugs</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/03ef6465/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml 
b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
index bb459a8..c1300a4 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
@@ -27,14 +27,15 @@
     <packaging>pom</packaging>
 
     <properties>
-        <google.cloud.sdk.version>0.47.0-alpha</google.cloud.sdk.version>
+        <google.cloud.sdk.version>0.71.0-alpha</google.cloud.sdk.version>
     </properties>
 
     <dependencyManagement>
         <dependencies>
             <dependency>
+               <!-- 
https://github.com/GoogleCloudPlatform/google-cloud-java/tree/master/google-cloud-bom
 -->
                 <groupId>com.google.cloud</groupId>
-                <artifactId>google-cloud</artifactId>
+                <artifactId>google-cloud-bom</artifactId>
                 <version>${google.cloud.sdk.version}</version>
                 <type>pom</type>
                 <scope>import</scope>

Reply via email to