arpadboda commented on code in PR #6344:
URL: https://github.com/apache/nifi/pull/6344#discussion_r972165449
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java:
##########
@@ -382,12 +368,11 @@ private WriteStream createWriteStream() {
}
protected BigQueryWriteClient createWriteClient(GoogleCredentials
credentials) {
- BigQueryWriteClient client = null;
+ BigQueryWriteClient client;
try {
client =
BigQueryWriteClient.create(BigQueryWriteSettings.newBuilder().setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build());
- } catch (IOException e) {
- getLogger().error("Failed to create Big Query Write Client for
writing due to {}", new Object[] {e});
- setupException.set(e);
+ } catch (Exception e) {
+ throw new ProcessException("Failed to create Big Query Write
Client for writing due to", e);
Review Comment:
I think there is a better exception type for this (maybe process schedule
exception, I can't tell off the top).
##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
+import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
+import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
+import com.google.cloud.bigquery.storage.v1.BQTableSchemaToProtoDescriptor;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
+import com.google.cloud.bigquery.storage.v1.CivilTimeEncoder;
+import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
+import com.google.cloud.bigquery.storage.v1.Exceptions;
+import com.google.cloud.bigquery.storage.v1.ProtoRows;
+import com.google.cloud.bigquery.storage.v1.ProtoSchema;
+import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
+import com.google.cloud.bigquery.storage.v1.StorageError;
+import com.google.cloud.bigquery.storage.v1.StreamWriter;
+import com.google.cloud.bigquery.storage.v1.TableName;
+import com.google.cloud.bigquery.storage.v1.WriteStream;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import io.grpc.Status;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.bigquery.proto.ProtoUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+@TriggerSerially
+@EventDriven
+@Tags({"google", "google cloud", "bq", "bigquery"})
+@CapabilityDescription("Unified processor for batch and stream flow files
content to a Google BigQuery table via the Storage Write API." +
+ "The processor is record based so the used schema is driven by the
RecordReader. Attributes that are not matched to the target schema" +
+ "are skipped. Exactly once delivery semantics are achieved via stream
offsets. The Storage Write API is more efficient than the older " +
+ "insertAll method because it uses gRPC streaming rather than REST over
HTTP")
+@SeeAlso({PutBigQueryBatch.class, PutBigQueryStreaming.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttributes({
+ @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR,
description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
+})
+public class PutBigQuery extends AbstractBigQueryProcessor {
+
+ static final String STREAM = "STREAM";
+ static final String BATCH = "BATCH";
+ static final AllowableValue STREAM_TYPE = new AllowableValue(STREAM,
STREAM, "Define streaming approach.");
+ static final AllowableValue BATCH_TYPE = new AllowableValue(BATCH, BATCH,
"Define batching approach.");
+
+ private static final String APPEND_RECORD_COUNT_NAME =
"bq.append.record.count";
+ private static final String APPEND_RECORD_COUNT_DESC = "The number of
records to be appended to the write stream at once. Applicable for both batch
and stream types.";
+ private static final String TRANSFER_TYPE_NAME = "bq.transfer.type";
+ private static final String TRANSFER_TYPE_DESC = "Defines the preferred
transfer type streaming or batching.";
+
+ private static final List<Status.Code> RETRYABLE_ERROR_CODES =
Arrays.asList(Status.Code.INTERNAL, Status.Code.ABORTED, Status.Code.CANCELLED);
+
+ private final AtomicReference<RuntimeException> error = new
AtomicReference<>();
+ private final AtomicInteger appendSuccessCount = new AtomicInteger(0);
+ private final Phaser inflightRequestCount = new Phaser(1);
+ private TableName tableName = null;
+ private BigQueryWriteClient writeClient = null;
+ private StreamWriter streamWriter = null;
+ private String transferType;
+ private int maxRetryCount;
+ private int recordBatchCount;
+ private boolean skipInvalidRows;
+
+ static final PropertyDescriptor TRANSFER_TYPE = new
PropertyDescriptor.Builder()
+ .name(TRANSFER_TYPE_NAME)
+ .displayName("Transfer Type")
+ .description(TRANSFER_TYPE_DESC)
+ .required(true)
+ .defaultValue(STREAM_TYPE.getValue())
+ .allowableValues(STREAM_TYPE, BATCH_TYPE)
+ .build();
+
+ static final PropertyDescriptor APPEND_RECORD_COUNT = new
PropertyDescriptor.Builder()
+ .name(APPEND_RECORD_COUNT_NAME)
+ .displayName("Append Record Count")
+ .description(APPEND_RECORD_COUNT_DESC)
+ .required(true)
+ .defaultValue("20")
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.RECORD_READER_ATTR)
+ .displayName("Record Reader")
+ .description(BigQueryAttributes.RECORD_READER_DESC)
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor SKIP_INVALID_ROWS = new
PropertyDescriptor.Builder()
+ .name(BigQueryAttributes.SKIP_INVALID_ROWS_ATTR)
+ .displayName("Skip Invalid Rows")
+ .description(BigQueryAttributes.SKIP_INVALID_ROWS_DESC)
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue("false")
+ .build();
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ List<PropertyDescriptor> descriptors = new
ArrayList<>(super.getSupportedPropertyDescriptors());
+ descriptors.add(TRANSFER_TYPE);
+ descriptors.add(RECORD_READER);
+ descriptors.add(APPEND_RECORD_COUNT);
+ descriptors.add(SKIP_INVALID_ROWS);
+ descriptors.remove(IGNORE_UNKNOWN);
+
+ return Collections.unmodifiableList(descriptors);
+ }
+
+ @Override
+ @OnScheduled
+ public void onScheduled(ProcessContext context) {
+ super.onScheduled(context);
+
+ transferType = context.getProperty(TRANSFER_TYPE).getValue();
+ maxRetryCount = context.getProperty(RETRY_COUNT).asInteger();
+ skipInvalidRows = context.getProperty(SKIP_INVALID_ROWS).asBoolean();
+ recordBatchCount =
context.getProperty(APPEND_RECORD_COUNT).asInteger();
+ tableName = TableName.of(context.getProperty(PROJECT_ID).getValue(),
context.getProperty(DATASET).getValue(),
context.getProperty(TABLE_NAME).getValue());
+ writeClient = createWriteClient(getGoogleCredentials(context));
+ }
+
+ @OnUnscheduled
+ public void onUnScheduled() {
+ writeClient.shutdown();
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
Review Comment:
Maybe I miss something, but I guess this doesn't throw anymore as the part
that throws was moved to onschedule.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]