This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 32a36209840b6823f6e7a72990bf124d44d51a79
Author: Christoph Deppisch <[email protected]>
AuthorDate: Fri Nov 30 10:42:47 2018 +0100

    CAMEL-12967 - Adding new option to split value ranges to a stream of rows 
each represented in a separate exchange
---
 .../main/docs/google-sheets-stream-component.adoc  |  3 +-
 .../sheets/stream/GoogleSheetsStreamComponent.java |  8 ++--
 .../stream/GoogleSheetsStreamConfiguration.java    | 18 ++++++++
 .../sheets/stream/GoogleSheetsStreamConstants.java |  2 +
 .../sheets/stream/GoogleSheetsStreamConsumer.java  | 29 +++++++++---
 .../sheets/stream/GoogleSheetsStreamEndpoint.java  | 30 +++++++++----
 .../SheetsStreamConsumerIntegrationTest.java       | 52 ++++++++++++++++++++++
 .../GoogleSheetsStreamComponentConfiguration.java  | 15 +++++++
 8 files changed, 138 insertions(+), 19 deletions(-)

diff --git 
a/components/camel-google-sheets/src/main/docs/google-sheets-stream-component.adoc
 
b/components/camel-google-sheets/src/main/docs/google-sheets-stream-component.adoc
index 63555c9..bc58b0b 100644
--- 
a/components/camel-google-sheets/src/main/docs/google-sheets-stream-component.adoc
+++ 
b/components/camel-google-sheets/src/main/docs/google-sheets-stream-component.adoc
@@ -78,7 +78,7 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (31 parameters):
+==== Query Parameters (32 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -96,6 +96,7 @@ with the following path and query parameters:
 | *refreshToken* (consumer) | OAuth 2 refresh token. Using this, the Google 
Calendar component can obtain a new accessToken whenever the current one 
expires - a necessity if the application is long-lived. |  | String
 | *scopes* (consumer) | Specifies the level of permissions you want a sheets 
application to have to a user account. See 
https://developers.google.com/identity/protocols/googlescopes for more info. |  
| List
 | *sendEmptyMessageWhenIdle* (consumer) | If the polling consumer did not poll 
any files, you can enable this option to send an empty message (no body) 
instead. | false | boolean
+| *splitResults* (consumer) | True if value range result should be split into 
rows or columns to process each of them individually. When true each row or 
column is represented with a separate exchange in batch processing. Otherwise 
value range object is used as exchange junk size. | false | boolean
 | *spreadsheetId* (consumer) | Specifies the spreadsheet identifier that is 
used to identify the target to obtain. |  | String
 | *valueRenderOption* (consumer) | Determines how values should be rendered in 
the output. | FORMATTED_VALUE | String
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
option is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
diff --git 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamComponent.java
 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamComponent.java
index d461037..d602326 100644
--- 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamComponent.java
+++ 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamComponent.java
@@ -52,9 +52,11 @@ public class GoogleSheetsStreamComponent extends 
DefaultComponent {
 
     public Sheets getClient(GoogleSheetsStreamConfiguration 
endpointConfiguration) {
         if (client == null) {
-            client = 
getClientFactory().makeClient(endpointConfiguration.getClientId(), 
endpointConfiguration.getClientSecret(),
-                                                    
endpointConfiguration.getApplicationName(), 
endpointConfiguration.getRefreshToken(),
-                                                    
endpointConfiguration.getAccessToken());
+            client = 
getClientFactory().makeClient(endpointConfiguration.getClientId(),
+                                                
endpointConfiguration.getClientSecret(),
+                                                
endpointConfiguration.getApplicationName(),
+                                                
endpointConfiguration.getRefreshToken(),
+                                                
endpointConfiguration.getAccessToken());
         }
         return client;
     }
diff --git 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConfiguration.java
 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConfiguration.java
index e72446c..4de5222 100644
--- 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConfiguration.java
+++ 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConfiguration.java
@@ -66,6 +66,9 @@ public class GoogleSheetsStreamConfiguration implements 
Cloneable {
     @UriParam
     private boolean includeGridData;
 
+    @UriParam
+    private boolean splitResults;
+
     @UriParam(enums = "ROWS,COLUMNS,DIMENSION_UNSPECIFIED", defaultValue = 
"ROWS")
     private String majorDimension = "ROWS";
 
@@ -240,6 +243,21 @@ public class GoogleSheetsStreamConfiguration implements 
Cloneable {
         this.includeGridData = includeGridData;
     }
 
+    public boolean isSplitResults() {
+        return splitResults;
+    }
+
+    /**
+     * True if value range result should be split into rows or columns to 
process each of them individually. When true
+     * each row or column is represented with a separate exchange in batch 
processing. Otherwise value range object is used
+     * as exchange junk size.
+     *
+     * @param splitResults
+     */
+    public void setSplitResults(boolean splitResults) {
+        this.splitResults = splitResults;
+    }
+
     // *************************************************
     //
     // *************************************************
diff --git 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConstants.java
 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConstants.java
index d721585..6536588 100644
--- 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConstants.java
+++ 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConstants.java
@@ -27,6 +27,8 @@ public final class GoogleSheetsStreamConstants {
     public static final String SPREADSHEET_URL =  PROPERTY_PREFIX + 
"SpreadsheetUrl";
     public static final String MAJOR_DIMENSION = PROPERTY_PREFIX + 
"MajorDimension";
     public static final String RANGE = PROPERTY_PREFIX + "Range";
+    public static final String RANGE_INDEX = PROPERTY_PREFIX + "RangeIndex";
+    public static final String VALUE_INDEX = PROPERTY_PREFIX + "ValueIndex";
 
     /**
      * Prevent instantiation.
diff --git 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
index 539dcaf..35b8815 100644
--- 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
+++ 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
@@ -20,11 +20,13 @@ import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import com.google.api.services.sheets.v4.Sheets;
 import com.google.api.services.sheets.v4.model.BatchGetValuesResponse;
 import com.google.api.services.sheets.v4.model.Spreadsheet;
+import com.google.api.services.sheets.v4.model.ValueRange;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -70,8 +72,8 @@ public class GoogleSheetsStreamConsumer extends 
ScheduledBatchPollingConsumer {
 
             if (getConfiguration().getRange().contains(",")) {
                 
request.setRanges(Arrays.stream(getConfiguration().getRange().split(","))
-                                        .map(String::trim)
-                                        .collect(Collectors.toList()));
+                        .map(String::trim)
+                        .collect(Collectors.toList()));
             } else {
                 
request.setRanges(Collections.singletonList(getConfiguration().getRange()));
             }
@@ -79,11 +81,24 @@ public class GoogleSheetsStreamConsumer extends 
ScheduledBatchPollingConsumer {
             BatchGetValuesResponse response = request.execute();
 
             if (response.getValueRanges() != null) {
-                response.getValueRanges()
-                        .stream()
-                        .limit(getConfiguration().getMaxResults())
-                        .map(valueRange -> 
getEndpoint().createExchange(valueRange))
-                        .forEach(answer::add);
+                if (getConfiguration().isSplitResults()) {
+                    for (ValueRange valueRange : response.getValueRanges()) {
+                        AtomicInteger rangeIndex = new AtomicInteger(1);
+                        AtomicInteger valueIndex = new AtomicInteger();
+                        valueRange.getValues().stream()
+                            .limit(getConfiguration().getMaxResults())
+                            .map(values -> 
getEndpoint().createExchange(rangeIndex.get(), valueIndex.incrementAndGet(), 
valueRange.getRange(), valueRange.getMajorDimension(), values))
+                            .forEach(answer::add);
+                        rangeIndex.incrementAndGet();
+                    }
+                } else {
+                    AtomicInteger rangeIndex = new AtomicInteger();
+                    response.getValueRanges()
+                            .stream()
+                            .limit(getConfiguration().getMaxResults())
+                            .map(valueRange -> 
getEndpoint().createExchange(rangeIndex.incrementAndGet(), valueRange))
+                            .forEach(answer::add);
+                }
             }
         } else {
             Sheets.Spreadsheets.Get request = 
getClient().spreadsheets().get(getConfiguration().getSpreadsheetId());
diff --git 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamEndpoint.java
 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamEndpoint.java
index 8ae9f89..6d2adaf 100644
--- 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamEndpoint.java
+++ 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamEndpoint.java
@@ -16,10 +16,11 @@
  */
 package org.apache.camel.component.google.sheets.stream;
 
+import java.util.List;
+
 import com.google.api.services.sheets.v4.Sheets;
 import com.google.api.services.sheets.v4.model.Spreadsheet;
 import com.google.api.services.sheets.v4.model.ValueRange;
-
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -34,12 +35,12 @@ import org.apache.camel.spi.UriParam;
  * The google-sheets component provides access to Google Sheets.
  */
 @UriEndpoint(firstVersion = "2.23.0",
-             scheme = "google-sheets-stream",
-             title = "Google Sheets Stream",
-             syntax = "google-sheets-stream:apiName",
-             consumerClass = GoogleSheetsStreamConsumer.class,
-             consumerOnly = true,
-             label = "api,cloud,sheets")
+        scheme = "google-sheets-stream",
+        title = "Google Sheets Stream",
+        syntax = "google-sheets-stream:apiName",
+        consumerClass = GoogleSheetsStreamConsumer.class,
+        consumerOnly = true,
+        label = "api,cloud,sheets")
 public class GoogleSheetsStreamEndpoint extends ScheduledPollEndpoint {
 
     @UriParam
@@ -83,16 +84,29 @@ public class GoogleSheetsStreamEndpoint extends 
ScheduledPollEndpoint {
         return true;
     }
 
-    public Exchange createExchange(ValueRange valueRange) {
+    public Exchange createExchange(int rangeIndex, ValueRange valueRange) {
         Exchange exchange = super.createExchange(getExchangePattern());
         Message message = exchange.getIn();
         exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID, 
configuration.getSpreadsheetId());
         exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE, 
valueRange.getRange());
+        exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE_INDEX, 
rangeIndex);
         
exchange.getIn().setHeader(GoogleSheetsStreamConstants.MAJOR_DIMENSION, 
valueRange.getMajorDimension());
         message.setBody(valueRange);
         return exchange;
     }
 
+    public Exchange createExchange(int rangeIndex, int valueIndex, String 
range, String majorDimension, List<Object> values) {
+        Exchange exchange = super.createExchange(getExchangePattern());
+        Message message = exchange.getIn();
+        exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID, 
configuration.getSpreadsheetId());
+        exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE_INDEX, 
rangeIndex);
+        exchange.getIn().setHeader(GoogleSheetsStreamConstants.VALUE_INDEX, 
valueIndex);
+        exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE, range);
+        
exchange.getIn().setHeader(GoogleSheetsStreamConstants.MAJOR_DIMENSION, 
majorDimension);
+        message.setBody(values);
+        return exchange;
+    }
+
     public Exchange createExchange(Spreadsheet spreadsheet) {
         Exchange exchange = super.createExchange(getExchangePattern());
         Message message = exchange.getIn();
diff --git 
a/components/camel-google-sheets/src/test/java/org/apache/camel/component/google/sheets/stream/SheetsStreamConsumerIntegrationTest.java
 
b/components/camel-google-sheets/src/test/java/org/apache/camel/component/google/sheets/stream/SheetsStreamConsumerIntegrationTest.java
index aa7c284..9a0a4bd 100644
--- 
a/components/camel-google-sheets/src/test/java/org/apache/camel/component/google/sheets/stream/SheetsStreamConsumerIntegrationTest.java
+++ 
b/components/camel-google-sheets/src/test/java/org/apache/camel/component/google/sheets/stream/SheetsStreamConsumerIntegrationTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.google.sheets.stream;
 
+import java.util.List;
+
 import com.google.api.services.sheets.v4.model.Spreadsheet;
 import com.google.api.services.sheets.v4.model.ValueRange;
 import org.apache.camel.Exchange;
@@ -26,7 +28,9 @@ import org.junit.Test;
 
 import static 
org.apache.camel.component.google.sheets.stream.GoogleSheetsStreamConstants.MAJOR_DIMENSION;
 import static 
org.apache.camel.component.google.sheets.stream.GoogleSheetsStreamConstants.RANGE;
+import static 
org.apache.camel.component.google.sheets.stream.GoogleSheetsStreamConstants.RANGE_INDEX;
 import static 
org.apache.camel.component.google.sheets.stream.GoogleSheetsStreamConstants.SPREADSHEET_ID;
+import static 
org.apache.camel.component.google.sheets.stream.GoogleSheetsStreamConstants.VALUE_INDEX;
 
 public class SheetsStreamConsumerIntegrationTest extends 
AbstractGoogleSheetsStreamTestSupport {
 
@@ -46,9 +50,11 @@ public class SheetsStreamConsumerIntegrationTest extends 
AbstractGoogleSheetsStr
         Exchange exchange = mock.getReceivedExchanges().get(0);
         
Assert.assertTrue(exchange.getIn().getHeaders().containsKey(SPREADSHEET_ID));
         Assert.assertTrue(exchange.getIn().getHeaders().containsKey(RANGE));
+        
Assert.assertTrue(exchange.getIn().getHeaders().containsKey(RANGE_INDEX));
         
Assert.assertTrue(exchange.getIn().getHeaders().containsKey(MAJOR_DIMENSION));
         Assert.assertEquals(testSheet.getSpreadsheetId(), 
exchange.getIn().getHeaders().get(SPREADSHEET_ID));
         Assert.assertEquals(TEST_SHEET + "!" + range, 
exchange.getIn().getHeaders().get(RANGE));
+        Assert.assertEquals(1, exchange.getIn().getHeaders().get(RANGE_INDEX));
         Assert.assertEquals("ROWS", 
exchange.getIn().getHeaders().get(MAJOR_DIMENSION));
 
         ValueRange values = (ValueRange) exchange.getIn().getBody();
@@ -59,11 +65,57 @@ public class SheetsStreamConsumerIntegrationTest extends 
AbstractGoogleSheetsStr
         Assert.assertEquals("b2", values.getValues().get(1).get(1));
     }
 
+    @Test
+    public void testConsumeRowValues() throws Exception {
+        Spreadsheet testSheet = getSpreadsheetWithTestData();
+
+        
context().addRoutes(createGoogleStreamRouteBuilder(testSheet.getSpreadsheetId()));
+        context().startRoute("google-stream-values-test");
+
+        MockEndpoint mock = getMockEndpoint("mock:rows");
+        mock.expectedMinimumMessageCount(2);
+        assertMockEndpointsSatisfied();
+
+        Exchange exchange = mock.getReceivedExchanges().get(0);
+        
Assert.assertTrue(exchange.getIn().getHeaders().containsKey(SPREADSHEET_ID));
+        Assert.assertTrue(exchange.getIn().getHeaders().containsKey(RANGE));
+        
Assert.assertTrue(exchange.getIn().getHeaders().containsKey(RANGE_INDEX));
+        
Assert.assertTrue(exchange.getIn().getHeaders().containsKey(VALUE_INDEX));
+        
Assert.assertTrue(exchange.getIn().getHeaders().containsKey(MAJOR_DIMENSION));
+        Assert.assertEquals(testSheet.getSpreadsheetId(), 
exchange.getIn().getHeaders().get(SPREADSHEET_ID));
+        Assert.assertEquals(TEST_SHEET + "!" + range, 
exchange.getIn().getHeaders().get(RANGE));
+        Assert.assertEquals(1, exchange.getIn().getHeaders().get(RANGE_INDEX));
+        Assert.assertEquals(1, exchange.getIn().getHeaders().get(VALUE_INDEX));
+        Assert.assertEquals("ROWS", 
exchange.getIn().getHeaders().get(MAJOR_DIMENSION));
+
+        List<?> values = (List) exchange.getIn().getBody();
+        Assert.assertEquals(2L, values.size());
+        Assert.assertEquals("a1", values.get(0));
+        Assert.assertEquals("b1", values.get(1));
+
+        exchange = mock.getReceivedExchanges().get(1);
+        
Assert.assertTrue(exchange.getIn().getHeaders().containsKey(SPREADSHEET_ID));
+        Assert.assertTrue(exchange.getIn().getHeaders().containsKey(RANGE));
+        
Assert.assertTrue(exchange.getIn().getHeaders().containsKey(RANGE_INDEX));
+        
Assert.assertTrue(exchange.getIn().getHeaders().containsKey(VALUE_INDEX));
+        
Assert.assertTrue(exchange.getIn().getHeaders().containsKey(MAJOR_DIMENSION));
+        Assert.assertEquals(testSheet.getSpreadsheetId(), 
exchange.getIn().getHeaders().get(SPREADSHEET_ID));
+        Assert.assertEquals(1, exchange.getIn().getHeaders().get(RANGE_INDEX));
+        Assert.assertEquals(2, exchange.getIn().getHeaders().get(VALUE_INDEX));
+
+        values = (List) exchange.getIn().getBody();
+        Assert.assertEquals(2L, values.size());
+        Assert.assertEquals("a2", values.get(0));
+        Assert.assertEquals("b2", values.get(1));
+    }
+
     private RouteBuilder createGoogleStreamRouteBuilder(String spreadsheetId) 
throws Exception {
         return new RouteBuilder() {
             @Override
             public void configure() {
                 from("google-sheets-stream://data?spreadsheetId=" + 
spreadsheetId + "&range=" + range + 
"&delay=2000&maxResults=5").routeId("google-stream-test").to("mock:result");
+
+                from("google-sheets-stream://data?spreadsheetId=" + 
spreadsheetId + "&range=" + range + 
"&delay=2000&maxResults=5&splitResults=true").routeId("google-stream-values-test").to("mock:rows");
             }
         };
     }
diff --git 
a/platforms/spring-boot/components-starter/camel-google-sheets-starter/src/main/java/org/apache/camel/component/google/sheets/stream/springboot/GoogleSheetsStreamComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-google-sheets-starter/src/main/java/org/apache/camel/component/google/sheets/stream/springboot/GoogleSheetsStreamComponentConfiguration.java
index 3dbff38..8df3880 100644
--- 
a/platforms/spring-boot/components-starter/camel-google-sheets-starter/src/main/java/org/apache/camel/component/google/sheets/stream/springboot/GoogleSheetsStreamComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-google-sheets-starter/src/main/java/org/apache/camel/component/google/sheets/stream/springboot/GoogleSheetsStreamComponentConfiguration.java
@@ -144,6 +144,13 @@ public class GoogleSheetsStreamComponentConfiguration
          * True if grid data should be returned.
          */
         private Boolean includeGridData = false;
+        /**
+         * True if value range result should be split into rows or columns to
+         * process each of them individually. When true each row or column is
+         * represented with a separate exchange in batch processing. Otherwise
+         * value range object is used as exchange junk size.
+         */
+        private Boolean splitResults = false;
 
         public String getClientId() {
             return clientId;
@@ -248,5 +255,13 @@ public class GoogleSheetsStreamComponentConfiguration
         public void setIncludeGridData(Boolean includeGridData) {
             this.includeGridData = includeGridData;
         }
+
+        public Boolean getSplitResults() {
+            return splitResults;
+        }
+
+        public void setSplitResults(Boolean splitResults) {
+            this.splitResults = splitResults;
+        }
     }
 }
\ No newline at end of file

Reply via email to