bejancsaba commented on code in PR #6344:
URL: https://github.com/apache/nifi/pull/6344#discussion_r973576509


##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreamingIT.java:
##########
@@ -47,6 +47,10 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * @deprecated {@link PutBigQueryIT} is covering the needed validation for the 
new class.
+ */
+@Deprecated

Review Comment:
   Similalrily to the other, removed the test



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.gcp.bigquery;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
+import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
+import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
+import com.google.cloud.bigquery.storage.v1.BQTableSchemaToProtoDescriptor;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
+import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
+import com.google.cloud.bigquery.storage.v1.CivilTimeEncoder;
+import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
+import com.google.cloud.bigquery.storage.v1.Exceptions;
+import com.google.cloud.bigquery.storage.v1.ProtoRows;
+import com.google.cloud.bigquery.storage.v1.ProtoSchema;
+import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
+import com.google.cloud.bigquery.storage.v1.StorageError;
+import com.google.cloud.bigquery.storage.v1.StreamWriter;
+import com.google.cloud.bigquery.storage.v1.TableName;
+import com.google.cloud.bigquery.storage.v1.WriteStream;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import io.grpc.Status;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.bigquery.proto.ProtoUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+@TriggerSerially
+@EventDriven
+@Tags({"google", "google cloud", "bq", "bigquery"})
+@CapabilityDescription("Unified processor for batch and stream flow files 
content to a Google BigQuery table via the Storage Write API." +
+    "The processor is record based so the used schema is driven by the 
RecordReader. Attributes that are not matched to the target schema" +
+    "are skipped. Exactly once delivery semantics are achieved via stream 
offsets. The Storage Write API is more efficient than the older " +
+    "insertAll method because it uses gRPC streaming rather than REST over 
HTTP")
+@SeeAlso({PutBigQueryBatch.class, PutBigQueryStreaming.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttributes({
+    @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, 
description = BigQueryAttributes.JOB_NB_RECORDS_DESC)
+})
+public class PutBigQuery extends AbstractBigQueryProcessor {
+
+    static final String STREAM = "STREAM";
+    static final String BATCH = "BATCH";
+    static final AllowableValue STREAM_TYPE = new AllowableValue(STREAM, 
STREAM, "Define streaming approach.");

Review Comment:
   Applied



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to