turcsanyip commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r977962708
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java:
##########
@@ -18,97 +18,107 @@
import org.apache.nifi.components.DescribedValue;
+import static
org.apache.nifi.processors.hubspot.IncrementalFieldType.HS_LAST_MODIFIED_DATE;
+import static
org.apache.nifi.processors.hubspot.IncrementalFieldType.LAST_MODIFIED_DATE;
+
public enum HubSpotObjectType implements DescribedValue {
COMPANIES(
"/crm/v3/objects/companies",
"Companies",
"In HubSpot, the companies object is a standard CRM object.
Individual company records can be used to store information about businesses" +
- " and organizations within company properties."
+ " and organizations within company properties.",
+ HS_LAST_MODIFIED_DATE
),
CONTACTS(
"/crm/v3/objects/contacts",
"Contacts",
"In HubSpot, contacts store information about individuals. From
marketing automation to smart content, the lead-specific data found in" +
- " contact records helps users leverage much of HubSpot's
functionality."
+ " contact records helps users leverage much of HubSpot's
functionality.",
+ LAST_MODIFIED_DATE
),
DEALS(
"/crm/v3/objects/deals",
"Deals",
"In HubSpot, a deal represents an ongoing transaction that a sales
team is pursuing with a contact or company. It’s tracked through" +
- " pipeline stages until won or lost."
+ " pipeline stages until won or lost.",
+ HS_LAST_MODIFIED_DATE
),
FEEDBACK_SUBMISSIONS(
Review Comment:
Feedback Submissions API is currently in beta. I'd suggest removing it until
it becomes GA.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,45 @@
+nifi-airtable-nar
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+**************************
+Apache Software License v2
+**************************
+
+ (ASLv2) Apache Commons Lang
+ The following NOTICE information applies:
+ Apache Commons Lang
+ Copyright 2001-2015 The Apache Software Foundation
+
+ This product includes software from the Spring Framework,
+ under the Apache License 2.0 (see: StringUtils.containsWhitespace())
Review Comment:
I cannot see `Apache Commons Lang` dependency in the HubSpot bundle. If this
is correct, please remove this entry.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -75,6 +82,10 @@
@DefaultSettings(yieldDuration = "10 sec")
public class GetHubSpot extends AbstractProcessor {
+ static final AllowableValue CREATE_DATE = new AllowableValue("createDate",
"Create Date", "The time of the field was created");
+ static final AllowableValue LAST_MODIFIED_DATE = new
AllowableValue("lastModifiedDate", "Last Modified Date",
+ "The time of the field was last modified");
+
Review Comment:
Unused constants.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
.description("The maximum number of results to request for each
invocation of the Processor")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .addValidator(StandardValidators.createLongValidator(1, 100, true))
+ .build();
+
+ static final PropertyDescriptor IS_INCREMENTAL = new
PropertyDescriptor.Builder()
+ .name("is-incremental")
+ .displayName("Incremental Loading")
+ .description("The processor can incrementally load the queried
objects so that each object is queried exactly once." +
+ " For each query, the processor queries objects which were
created or modified after the previous run time" +
+ " but before the current time.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ static final PropertyDescriptor INCREMENTAL_DELAY = new
PropertyDescriptor.Builder()
+ .name("incremental-delay")
+ .displayName("Incremental Delay")
+ .description("The ending timestamp of the time window will be
adjusted earlier by the amount configured in this property." +
+ " For example, with a property value of 10 seconds, an
ending timestamp of 12:30:45 would be changed to 12:30:35.")
+ .required(false)
Review Comment:
I'd suggest setting it required with default value `3s` because some
difference between local and server times is always expected.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -154,17 +207,20 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
final String accessToken =
context.getProperty(ACCESS_TOKEN).getValue();
final String endpoint = context.getProperty(OBJECT_TYPE).getValue();
- final StateMap state = getStateMap(context);
- final URI uri = createUri(context, state);
+ final URI uri = getBaseUri(context);
- final HttpResponseEntity response = getHttpResponseEntity(accessToken,
uri);
- final AtomicInteger objectCountHolder = new AtomicInteger();
+ final AtomicInteger total = new AtomicInteger(-1);
+ final StateMap state = getStateMap(context);
+ final Map<String, String> stateMap = new HashMap<>(state.toMap());
+ final String filters = createIncrementalFilters(context, stateMap);
+ final HttpResponseEntity response = getHttpResponseEntity(accessToken,
uri, filters);
if (response.statusCode() == HttpResponseStatus.OK.getCode()) {
FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, parseHttpResponse(context,
endpoint, state, response, objectCountHolder));
- if (objectCountHolder.get() > 0) {
+ flowFile = session.write(flowFile, parseHttpResponse(context,
response, total, stateMap));
+ if (total.get() > 0) {
session.transfer(flowFile, REL_SUCCESS);
+ updateState(context, stateMap);
} else {
getLogger().debug("Empty response when requested HubSpot
endpoint: [{}]", endpoint);
session.remove(flowFile);
Review Comment:
The processor should yield in case of empty response in order not to hit the
HubSpot API too frequently and unnecessarily when no new data is available.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final
ProcessContext context, final HttpR
}
}
- private OutputStreamCallback parseHttpResponse(ProcessContext context,
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger
objectCountHolder) {
+ private OutputStreamCallback parseHttpResponse(final ProcessContext
context, final HttpResponseEntity response, final AtomicInteger total,
+ final Map<String, String>
stateMap) {
return out -> {
try (final JsonParser jsonParser =
JSON_FACTORY.createParser(response.body());
final JsonGenerator jsonGenerator =
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+ boolean isCursorAvailable = false;
+ final String objectType =
context.getProperty(OBJECT_TYPE).getValue();
+ final String cursorKey = String.format(CURSOR_KEY_PATTERN,
objectType);
while (jsonParser.nextToken() != null) {
+ if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
+ .equals("total")) {
+ jsonParser.nextToken();
+ total.set(jsonParser.getIntValue());
+ }
if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
.equals("results")) {
jsonParser.nextToken();
jsonGenerator.copyCurrentStructure(jsonParser);
- objectCountHolder.incrementAndGet();
}
final String fieldName = jsonParser.getCurrentName();
- if (CURSOR_PARAMETER.equals(fieldName)) {
+ if (PAGING_CURSOR.equals(fieldName)) {
+ isCursorAvailable = true;
jsonParser.nextToken();
- Map<String, String> newStateMap = new
HashMap<>(state.toMap());
- newStateMap.put(endpoint, jsonParser.getText());
- updateState(context, newStateMap);
+ stateMap.put(cursorKey, jsonParser.getText());
break;
}
}
+ if (!isCursorAvailable) {
+ stateMap.put(cursorKey, NO_PAGING);
+ }
}
};
}
- HttpUriBuilder getBaseUri(final ProcessContext context) {
+ URI getBaseUri(final ProcessContext context) {
final String path = context.getProperty(OBJECT_TYPE).getValue();
return webClientServiceProvider.getHttpUriBuilder()
.scheme(HTTPS)
.host(API_BASE_URI)
- .encodedPath(path);
+ .encodedPath(path + "/search")
+ .build();
}
- private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri) {
+ private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri, final String filters) {
+ final JsonInputStreamConverter converter = new
JsonInputStreamConverter(filters);
Review Comment:
`IOUtils.toInputStream(filters, StandardCharsets.UTF_8)` could be used with
`available()` method for `contentLength`.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -154,17 +207,20 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
final String accessToken =
context.getProperty(ACCESS_TOKEN).getValue();
final String endpoint = context.getProperty(OBJECT_TYPE).getValue();
- final StateMap state = getStateMap(context);
- final URI uri = createUri(context, state);
+ final URI uri = getBaseUri(context);
- final HttpResponseEntity response = getHttpResponseEntity(accessToken,
uri);
- final AtomicInteger objectCountHolder = new AtomicInteger();
+ final AtomicInteger total = new AtomicInteger(-1);
+ final StateMap state = getStateMap(context);
+ final Map<String, String> stateMap = new HashMap<>(state.toMap());
Review Comment:
The method could return `Map<String, String>` instead. `StateMap` is not
used directly.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -117,18 +160,28 @@ public class GetHubSpot extends AbstractProcessor {
private static final String API_BASE_URI = "api.hubapi.com";
private static final String HTTPS = "https";
- private static final String CURSOR_PARAMETER = "after";
- private static final String LIMIT_PARAMETER = "limit";
private static final int TOO_MANY_REQUESTS = 429;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+ private static final Map<String, HubSpotObjectType> objectTypeLookupMap =
createObjectTypeLookupMap();
+ private static final String NO_PAGING = "no paging";
+ private static final String PAGING_CURSOR = "after";
+ private static final String CURSOR_KEY_PATTERN = "paging_next: %s";
+
+ private static Map<String, HubSpotObjectType> createObjectTypeLookupMap() {
+ return Arrays.stream(HubSpotObjectType.values())
+ .collect(Collectors.toMap(HubSpotObjectType::getValue,
Function.identity()));
+ }
private volatile WebClientServiceProvider webClientServiceProvider;
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
Collections.unmodifiableList(Arrays.asList(
OBJECT_TYPE,
Review Comment:
The processor should reset its state when the user changes the Object Type.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
.description("The maximum number of results to request for each
invocation of the Processor")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .addValidator(StandardValidators.createLongValidator(1, 100, true))
+ .build();
+
+ static final PropertyDescriptor IS_INCREMENTAL = new
PropertyDescriptor.Builder()
+ .name("is-incremental")
+ .displayName("Incremental Loading")
+ .description("The processor can incrementally load the queried
objects so that each object is queried exactly once." +
+ " For each query, the processor queries objects which were
created or modified after the previous run time" +
+ " but before the current time.")
Review Comment:
```suggestion
" but before the current time (query time window).")
```
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/META-INF/LICENSE:
##########
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
Review Comment:
LICENSE / NOTICE files need to be added in the NAR module, not in the
processors module.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -117,18 +160,28 @@ public class GetHubSpot extends AbstractProcessor {
private static final String API_BASE_URI = "api.hubapi.com";
private static final String HTTPS = "https";
- private static final String CURSOR_PARAMETER = "after";
- private static final String LIMIT_PARAMETER = "limit";
private static final int TOO_MANY_REQUESTS = 429;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+ private static final Map<String, HubSpotObjectType> objectTypeLookupMap =
createObjectTypeLookupMap();
Review Comment:
Please always use upper case for constants.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -75,6 +82,10 @@
@DefaultSettings(yieldDuration = "10 sec")
public class GetHubSpot extends AbstractProcessor {
Review Comment:
Being a source processor, `@TriggerWhenEmpty` is not applicable here. Please
remove it.
Please also update `@Stateful`-s description with incremental loading state.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
.description("The maximum number of results to request for each
invocation of the Processor")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .addValidator(StandardValidators.createLongValidator(1, 100, true))
+ .build();
+
+ static final PropertyDescriptor IS_INCREMENTAL = new
PropertyDescriptor.Builder()
+ .name("is-incremental")
+ .displayName("Incremental Loading")
+ .description("The processor can incrementally load the queried
objects so that each object is queried exactly once." +
+ " For each query, the processor queries objects which were
created or modified after the previous run time" +
+ " but before the current time.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ static final PropertyDescriptor INCREMENTAL_DELAY = new
PropertyDescriptor.Builder()
+ .name("incremental-delay")
+ .displayName("Incremental Delay")
+ .description("The ending timestamp of the time window will be
adjusted earlier by the amount configured in this property." +
+ " For example, with a property value of 10 seconds, an
ending timestamp of 12:30:45 would be changed to 12:30:35.")
+ .required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .dependsOn(IS_INCREMENTAL, "true")
+ .build();
+
+ static final PropertyDescriptor INITIAL_INCREMENTAL_FILTER = new
PropertyDescriptor.Builder()
+ .name("initial-incremental-filter")
+ .displayName("Initial Incremental Start Time")
Review Comment:
It is a good practice to align the property name, display name and variable
name.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java:
##########
@@ -18,97 +18,107 @@
import org.apache.nifi.components.DescribedValue;
+import static
org.apache.nifi.processors.hubspot.IncrementalFieldType.HS_LAST_MODIFIED_DATE;
+import static
org.apache.nifi.processors.hubspot.IncrementalFieldType.LAST_MODIFIED_DATE;
+
public enum HubSpotObjectType implements DescribedValue {
COMPANIES(
"/crm/v3/objects/companies",
"Companies",
"In HubSpot, the companies object is a standard CRM object.
Individual company records can be used to store information about businesses" +
- " and organizations within company properties."
+ " and organizations within company properties.",
+ HS_LAST_MODIFIED_DATE
Review Comment:
The Additional Details page is not in sync with this entity list. Please
update the documentation.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
.description("The maximum number of results to request for each
invocation of the Processor")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .addValidator(StandardValidators.createLongValidator(1, 100, true))
+ .build();
+
+ static final PropertyDescriptor IS_INCREMENTAL = new
PropertyDescriptor.Builder()
+ .name("is-incremental")
+ .displayName("Incremental Loading")
+ .description("The processor can incrementally load the queried
objects so that each object is queried exactly once." +
+ " For each query, the processor queries objects which were
created or modified after the previous run time" +
+ " but before the current time.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ static final PropertyDescriptor INCREMENTAL_DELAY = new
PropertyDescriptor.Builder()
+ .name("incremental-delay")
+ .displayName("Incremental Delay")
+ .description("The ending timestamp of the time window will be
adjusted earlier by the amount configured in this property." +
+ " For example, with a property value of 10 seconds, an
ending timestamp of 12:30:45 would be changed to 12:30:35.")
Review Comment:
```suggestion
.description("The ending timestamp of the query time window will
be adjusted earlier by the amount configured in this property." +
" For example, with a property value of 10 seconds, an
ending timestamp of 12:30:45 would be changed to 12:30:35." +
" Set this property to avoid missing objects when the
clock of your local machines and HubSpot servers' clock are not in sync.")
```
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
.description("The maximum number of results to request for each
invocation of the Processor")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .addValidator(StandardValidators.createLongValidator(1, 100, true))
+ .build();
+
+ static final PropertyDescriptor IS_INCREMENTAL = new
PropertyDescriptor.Builder()
+ .name("is-incremental")
+ .displayName("Incremental Loading")
+ .description("The processor can incrementally load the queried
objects so that each object is queried exactly once." +
+ " For each query, the processor queries objects which were
created or modified after the previous run time" +
+ " but before the current time.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ static final PropertyDescriptor INCREMENTAL_DELAY = new
PropertyDescriptor.Builder()
+ .name("incremental-delay")
+ .displayName("Incremental Delay")
+ .description("The ending timestamp of the time window will be
adjusted earlier by the amount configured in this property." +
+ " For example, with a property value of 10 seconds, an
ending timestamp of 12:30:45 would be changed to 12:30:35.")
+ .required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .dependsOn(IS_INCREMENTAL, "true")
+ .build();
+
+ static final PropertyDescriptor INITIAL_INCREMENTAL_FILTER = new
PropertyDescriptor.Builder()
+ .name("initial-incremental-filter")
+ .displayName("Initial Incremental Start Time")
+ .description("This property specifies the start time as Epoch Time
that the processor applies when running the first request.")
Review Comment:
It would a better user experience to configure it in human readable format.
Suggested format: 2022-09-22T21:50:18.000Z
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/IncrementalFieldType.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+public enum IncrementalFieldType {
+ LAST_MODIFIED_DATE("lastmodifieddate"),
+ HS_LAST_MODIFIED_DATE("hs_lastmodifieddate");
+
+ final String value;
Review Comment:
```suggestion
private final String value;
```
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final
ProcessContext context, final HttpR
}
}
- private OutputStreamCallback parseHttpResponse(ProcessContext context,
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger
objectCountHolder) {
+ private OutputStreamCallback parseHttpResponse(final ProcessContext
context, final HttpResponseEntity response, final AtomicInteger total,
+ final Map<String, String>
stateMap) {
return out -> {
try (final JsonParser jsonParser =
JSON_FACTORY.createParser(response.body());
final JsonGenerator jsonGenerator =
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+ boolean isCursorAvailable = false;
+ final String objectType =
context.getProperty(OBJECT_TYPE).getValue();
+ final String cursorKey = String.format(CURSOR_KEY_PATTERN,
objectType);
while (jsonParser.nextToken() != null) {
+ if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
+ .equals("total")) {
+ jsonParser.nextToken();
+ total.set(jsonParser.getIntValue());
+ }
if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
.equals("results")) {
jsonParser.nextToken();
jsonGenerator.copyCurrentStructure(jsonParser);
- objectCountHolder.incrementAndGet();
}
final String fieldName = jsonParser.getCurrentName();
- if (CURSOR_PARAMETER.equals(fieldName)) {
+ if (PAGING_CURSOR.equals(fieldName)) {
+ isCursorAvailable = true;
jsonParser.nextToken();
- Map<String, String> newStateMap = new
HashMap<>(state.toMap());
- newStateMap.put(endpoint, jsonParser.getText());
- updateState(context, newStateMap);
+ stateMap.put(cursorKey, jsonParser.getText());
break;
}
}
+ if (!isCursorAvailable) {
+ stateMap.put(cursorKey, NO_PAGING);
+ }
}
};
}
- HttpUriBuilder getBaseUri(final ProcessContext context) {
+ URI getBaseUri(final ProcessContext context) {
final String path = context.getProperty(OBJECT_TYPE).getValue();
return webClientServiceProvider.getHttpUriBuilder()
.scheme(HTTPS)
.host(API_BASE_URI)
- .encodedPath(path);
+ .encodedPath(path + "/search")
+ .build();
}
- private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri) {
+ private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri, final String filters) {
+ final JsonInputStreamConverter converter = new
JsonInputStreamConverter(filters);
return webClientServiceProvider.getWebClientService()
- .get()
+ .post()
.uri(uri)
.header("Authorization", "Bearer " + accessToken)
+ .header("Content-Type", "application/json")
+ .body(converter.getInputStream(),
OptionalLong.of(converter.getByteSize()))
.retrieve();
}
- private URI createUri(final ProcessContext context, final StateMap state) {
- final String path = context.getProperty(OBJECT_TYPE).getValue();
- final HttpUriBuilder uriBuilder = getBaseUri(context);
+ String createIncrementalFilters(final ProcessContext context, final
Map<String, String> stateMap) {
+ final String limit = context.getProperty(RESULT_LIMIT).getValue();
+ final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+ final HubSpotObjectType hubSpotObjectType =
objectTypeLookupMap.get(objectType);
+ final Long incrDelayMs =
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+ final String startIncrementalKey = String.format("start: %s",
objectType);
+ final String endIncrementalKey = String.format("end: %s", objectType);
+ final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
+
+ final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+ root.put("limit", limit);
- final boolean isLimitSet =
context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
- if (isLimitSet) {
- final String limit = context.getProperty(RESULT_LIMIT).getValue();
- uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+ final String cursor = stateMap.get(cursorKey);
+ if (cursor != null && !NO_PAGING.equals(cursor)) {
+ root.put(PAGING_CURSOR, stateMap.get(cursorKey));
}
+ final boolean isIncremental =
context.getProperty(IS_INCREMENTAL).asBoolean();
+ if (isIncremental) {
+
+ final String hubspotSpecificIncrementalFieldName =
hubSpotObjectType.getLastModifiedDateType().getValue();
+ final String lastStartTime = stateMap.get(startIncrementalKey);
+ final String lastEndTime =
stateMap.getOrDefault(endIncrementalKey,
context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue());
+
+ String currentStartTime;
+ String currentEndTime;
+
+ if (cursor != null && !NO_PAGING.equals(cursor)) {
+ currentStartTime = lastStartTime;
+ // lastEndTime can be null if incremental loading was turned
off beforehand
+ currentEndTime = lastEndTime != null ? lastEndTime :
String.valueOf(getCurrentEpochTime());
+ } else {
+ currentStartTime = lastEndTime;
+ final long delayedCurrentEndTime = incrDelayMs != null ?
getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime();
+ currentEndTime = String.valueOf(delayedCurrentEndTime);
- final String cursor = state.get(path);
- if (cursor != null) {
- uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
+ stateMap.put(startIncrementalKey, currentStartTime);
+ stateMap.put(endIncrementalKey, currentEndTime);
+ }
+
+ final ArrayNode filters = OBJECT_MAPPER.createArrayNode();
+
+ if (currentStartTime != null) {
+ final ObjectNode greaterThanFilterNode =
OBJECT_MAPPER.createObjectNode();
+ greaterThanFilterNode.put("propertyName",
hubspotSpecificIncrementalFieldName);
+ greaterThanFilterNode.put("operator", "GT");
+ greaterThanFilterNode.put("value", currentStartTime);
+ filters.add(greaterThanFilterNode);
+ }
+
+ final ObjectNode lessThanFilterNode =
OBJECT_MAPPER.createObjectNode();
+ lessThanFilterNode.put("propertyName",
hubspotSpecificIncrementalFieldName);
+ lessThanFilterNode.put("operator", "LT");
+ lessThanFilterNode.put("value", currentEndTime);
+ filters.add(lessThanFilterNode);
+
+ root.set("filters", filters);
}
- return uriBuilder.build();
+ return root.toString();
+ }
+
+ long getCurrentEpochTime() {
+ return Instant.now().toEpochMilli();
}
private StateMap getStateMap(final ProcessContext context) {
Review Comment:
Please use `ProcessSession.getState()` and `setState()` instead of
`ProcessContext.getStateManager().getState()` and `setState()`because the
former is transactional and gets only committed if the session commits
successfully (along with the FFs sent out).
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final
ProcessContext context, final HttpR
}
}
- private OutputStreamCallback parseHttpResponse(ProcessContext context,
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger
objectCountHolder) {
+ private OutputStreamCallback parseHttpResponse(final ProcessContext
context, final HttpResponseEntity response, final AtomicInteger total,
+ final Map<String, String>
stateMap) {
return out -> {
try (final JsonParser jsonParser =
JSON_FACTORY.createParser(response.body());
final JsonGenerator jsonGenerator =
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+ boolean isCursorAvailable = false;
+ final String objectType =
context.getProperty(OBJECT_TYPE).getValue();
+ final String cursorKey = String.format(CURSOR_KEY_PATTERN,
objectType);
while (jsonParser.nextToken() != null) {
+ if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
+ .equals("total")) {
+ jsonParser.nextToken();
+ total.set(jsonParser.getIntValue());
+ }
if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
.equals("results")) {
jsonParser.nextToken();
jsonGenerator.copyCurrentStructure(jsonParser);
- objectCountHolder.incrementAndGet();
}
final String fieldName = jsonParser.getCurrentName();
- if (CURSOR_PARAMETER.equals(fieldName)) {
+ if (PAGING_CURSOR.equals(fieldName)) {
+ isCursorAvailable = true;
jsonParser.nextToken();
- Map<String, String> newStateMap = new
HashMap<>(state.toMap());
- newStateMap.put(endpoint, jsonParser.getText());
- updateState(context, newStateMap);
+ stateMap.put(cursorKey, jsonParser.getText());
break;
}
}
+ if (!isCursorAvailable) {
+ stateMap.put(cursorKey, NO_PAGING);
+ }
}
};
}
- HttpUriBuilder getBaseUri(final ProcessContext context) {
+ URI getBaseUri(final ProcessContext context) {
final String path = context.getProperty(OBJECT_TYPE).getValue();
return webClientServiceProvider.getHttpUriBuilder()
.scheme(HTTPS)
.host(API_BASE_URI)
- .encodedPath(path);
+ .encodedPath(path + "/search")
+ .build();
}
- private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri) {
+ private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri, final String filters) {
+ final JsonInputStreamConverter converter = new
JsonInputStreamConverter(filters);
return webClientServiceProvider.getWebClientService()
- .get()
+ .post()
.uri(uri)
.header("Authorization", "Bearer " + accessToken)
+ .header("Content-Type", "application/json")
+ .body(converter.getInputStream(),
OptionalLong.of(converter.getByteSize()))
.retrieve();
}
- private URI createUri(final ProcessContext context, final StateMap state) {
- final String path = context.getProperty(OBJECT_TYPE).getValue();
- final HttpUriBuilder uriBuilder = getBaseUri(context);
+ String createIncrementalFilters(final ProcessContext context, final
Map<String, String> stateMap) {
+ final String limit = context.getProperty(RESULT_LIMIT).getValue();
+ final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+ final HubSpotObjectType hubSpotObjectType =
objectTypeLookupMap.get(objectType);
+ final Long incrDelayMs =
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+ final String startIncrementalKey = String.format("start: %s",
objectType);
+ final String endIncrementalKey = String.format("end: %s", objectType);
+ final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
+
+ final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+ root.put("limit", limit);
- final boolean isLimitSet =
context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
- if (isLimitSet) {
- final String limit = context.getProperty(RESULT_LIMIT).getValue();
- uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+ final String cursor = stateMap.get(cursorKey);
+ if (cursor != null && !NO_PAGING.equals(cursor)) {
+ root.put(PAGING_CURSOR, stateMap.get(cursorKey));
}
+ final boolean isIncremental =
context.getProperty(IS_INCREMENTAL).asBoolean();
+ if (isIncremental) {
+
+ final String hubspotSpecificIncrementalFieldName =
hubSpotObjectType.getLastModifiedDateType().getValue();
+ final String lastStartTime = stateMap.get(startIncrementalKey);
+ final String lastEndTime =
stateMap.getOrDefault(endIncrementalKey,
context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue());
+
+ String currentStartTime;
+ String currentEndTime;
+
+ if (cursor != null && !NO_PAGING.equals(cursor)) {
+ currentStartTime = lastStartTime;
+ // lastEndTime can be null if incremental loading was turned
off beforehand
+ currentEndTime = lastEndTime != null ? lastEndTime :
String.valueOf(getCurrentEpochTime());
+ } else {
+ currentStartTime = lastEndTime;
+ final long delayedCurrentEndTime = incrDelayMs != null ?
getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime();
+ currentEndTime = String.valueOf(delayedCurrentEndTime);
- final String cursor = state.get(path);
- if (cursor != null) {
- uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
+ stateMap.put(startIncrementalKey, currentStartTime);
+ stateMap.put(endIncrementalKey, currentEndTime);
+ }
+
+ final ArrayNode filters = OBJECT_MAPPER.createArrayNode();
+
+ if (currentStartTime != null) {
+ final ObjectNode greaterThanFilterNode =
OBJECT_MAPPER.createObjectNode();
+ greaterThanFilterNode.put("propertyName",
hubspotSpecificIncrementalFieldName);
+ greaterThanFilterNode.put("operator", "GT");
Review Comment:
One end of the time window must be inclusive in order not to loose objects
with timestamp matching the window boundaries.
```suggestion
greaterThanFilterNode.put("operator", "GTE");
```
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -154,17 +207,20 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
final String accessToken =
context.getProperty(ACCESS_TOKEN).getValue();
final String endpoint = context.getProperty(OBJECT_TYPE).getValue();
- final StateMap state = getStateMap(context);
- final URI uri = createUri(context, state);
+ final URI uri = getBaseUri(context);
- final HttpResponseEntity response = getHttpResponseEntity(accessToken,
uri);
- final AtomicInteger objectCountHolder = new AtomicInteger();
+ final AtomicInteger total = new AtomicInteger(-1);
+ final StateMap state = getStateMap(context);
+ final Map<String, String> stateMap = new HashMap<>(state.toMap());
+ final String filters = createIncrementalFilters(context, stateMap);
+ final HttpResponseEntity response = getHttpResponseEntity(accessToken,
uri, filters);
if (response.statusCode() == HttpResponseStatus.OK.getCode()) {
FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, parseHttpResponse(context,
endpoint, state, response, objectCountHolder));
- if (objectCountHolder.get() > 0) {
+ flowFile = session.write(flowFile, parseHttpResponse(context,
response, total, stateMap));
+ if (total.get() > 0) {
session.transfer(flowFile, REL_SUCCESS);
+ updateState(context, stateMap);
Review Comment:
I would update the state with the new start/end times even when no new
objects have been found in this round in order to make the progress visible in
the state variables.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final
ProcessContext context, final HttpR
}
}
- private OutputStreamCallback parseHttpResponse(ProcessContext context,
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger
objectCountHolder) {
+ private OutputStreamCallback parseHttpResponse(final ProcessContext
context, final HttpResponseEntity response, final AtomicInteger total,
+ final Map<String, String>
stateMap) {
return out -> {
try (final JsonParser jsonParser =
JSON_FACTORY.createParser(response.body());
final JsonGenerator jsonGenerator =
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+ boolean isCursorAvailable = false;
+ final String objectType =
context.getProperty(OBJECT_TYPE).getValue();
+ final String cursorKey = String.format(CURSOR_KEY_PATTERN,
objectType);
while (jsonParser.nextToken() != null) {
+ if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
+ .equals("total")) {
+ jsonParser.nextToken();
+ total.set(jsonParser.getIntValue());
+ }
if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
.equals("results")) {
jsonParser.nextToken();
jsonGenerator.copyCurrentStructure(jsonParser);
- objectCountHolder.incrementAndGet();
}
final String fieldName = jsonParser.getCurrentName();
- if (CURSOR_PARAMETER.equals(fieldName)) {
+ if (PAGING_CURSOR.equals(fieldName)) {
+ isCursorAvailable = true;
jsonParser.nextToken();
- Map<String, String> newStateMap = new
HashMap<>(state.toMap());
- newStateMap.put(endpoint, jsonParser.getText());
- updateState(context, newStateMap);
+ stateMap.put(cursorKey, jsonParser.getText());
break;
}
}
+ if (!isCursorAvailable) {
+ stateMap.put(cursorKey, NO_PAGING);
+ }
}
};
}
- HttpUriBuilder getBaseUri(final ProcessContext context) {
+ URI getBaseUri(final ProcessContext context) {
final String path = context.getProperty(OBJECT_TYPE).getValue();
return webClientServiceProvider.getHttpUriBuilder()
.scheme(HTTPS)
.host(API_BASE_URI)
- .encodedPath(path);
+ .encodedPath(path + "/search")
+ .build();
}
- private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri) {
+ private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri, final String filters) {
+ final JsonInputStreamConverter converter = new
JsonInputStreamConverter(filters);
return webClientServiceProvider.getWebClientService()
- .get()
+ .post()
.uri(uri)
.header("Authorization", "Bearer " + accessToken)
+ .header("Content-Type", "application/json")
+ .body(converter.getInputStream(),
OptionalLong.of(converter.getByteSize()))
.retrieve();
}
- private URI createUri(final ProcessContext context, final StateMap state) {
- final String path = context.getProperty(OBJECT_TYPE).getValue();
- final HttpUriBuilder uriBuilder = getBaseUri(context);
+ String createIncrementalFilters(final ProcessContext context, final
Map<String, String> stateMap) {
+ final String limit = context.getProperty(RESULT_LIMIT).getValue();
+ final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+ final HubSpotObjectType hubSpotObjectType =
objectTypeLookupMap.get(objectType);
+ final Long incrDelayMs =
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+ final String startIncrementalKey = String.format("start: %s",
objectType);
+ final String endIncrementalKey = String.format("end: %s", objectType);
+ final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
+
+ final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+ root.put("limit", limit);
- final boolean isLimitSet =
context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
- if (isLimitSet) {
- final String limit = context.getProperty(RESULT_LIMIT).getValue();
- uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+ final String cursor = stateMap.get(cursorKey);
+ if (cursor != null && !NO_PAGING.equals(cursor)) {
+ root.put(PAGING_CURSOR, stateMap.get(cursorKey));
}
+ final boolean isIncremental =
context.getProperty(IS_INCREMENTAL).asBoolean();
+ if (isIncremental) {
+
+ final String hubspotSpecificIncrementalFieldName =
hubSpotObjectType.getLastModifiedDateType().getValue();
+ final String lastStartTime = stateMap.get(startIncrementalKey);
+ final String lastEndTime =
stateMap.getOrDefault(endIncrementalKey,
context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue());
+
+ String currentStartTime;
+ String currentEndTime;
Review Comment:
These cloud be `final` too.
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final
ProcessContext context, final HttpR
}
}
- private OutputStreamCallback parseHttpResponse(ProcessContext context,
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger
objectCountHolder) {
+ private OutputStreamCallback parseHttpResponse(final ProcessContext
context, final HttpResponseEntity response, final AtomicInteger total,
+ final Map<String, String>
stateMap) {
return out -> {
try (final JsonParser jsonParser =
JSON_FACTORY.createParser(response.body());
final JsonGenerator jsonGenerator =
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+ boolean isCursorAvailable = false;
+ final String objectType =
context.getProperty(OBJECT_TYPE).getValue();
+ final String cursorKey = String.format(CURSOR_KEY_PATTERN,
objectType);
while (jsonParser.nextToken() != null) {
+ if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
+ .equals("total")) {
+ jsonParser.nextToken();
+ total.set(jsonParser.getIntValue());
+ }
if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
.equals("results")) {
jsonParser.nextToken();
jsonGenerator.copyCurrentStructure(jsonParser);
- objectCountHolder.incrementAndGet();
}
final String fieldName = jsonParser.getCurrentName();
- if (CURSOR_PARAMETER.equals(fieldName)) {
+ if (PAGING_CURSOR.equals(fieldName)) {
+ isCursorAvailable = true;
jsonParser.nextToken();
- Map<String, String> newStateMap = new
HashMap<>(state.toMap());
- newStateMap.put(endpoint, jsonParser.getText());
- updateState(context, newStateMap);
+ stateMap.put(cursorKey, jsonParser.getText());
break;
}
}
+ if (!isCursorAvailable) {
+ stateMap.put(cursorKey, NO_PAGING);
+ }
}
};
}
- HttpUriBuilder getBaseUri(final ProcessContext context) {
+ URI getBaseUri(final ProcessContext context) {
final String path = context.getProperty(OBJECT_TYPE).getValue();
return webClientServiceProvider.getHttpUriBuilder()
.scheme(HTTPS)
.host(API_BASE_URI)
- .encodedPath(path);
+ .encodedPath(path + "/search")
+ .build();
}
- private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri) {
+ private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri, final String filters) {
+ final JsonInputStreamConverter converter = new
JsonInputStreamConverter(filters);
return webClientServiceProvider.getWebClientService()
- .get()
+ .post()
.uri(uri)
.header("Authorization", "Bearer " + accessToken)
+ .header("Content-Type", "application/json")
+ .body(converter.getInputStream(),
OptionalLong.of(converter.getByteSize()))
.retrieve();
}
- private URI createUri(final ProcessContext context, final StateMap state) {
- final String path = context.getProperty(OBJECT_TYPE).getValue();
- final HttpUriBuilder uriBuilder = getBaseUri(context);
+ String createIncrementalFilters(final ProcessContext context, final
Map<String, String> stateMap) {
Review Comment:
```suggestion
private String createIncrementalFilters(final ProcessContext context,
final Map<String, String> stateMap) {
```
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final
ProcessContext context, final HttpR
}
}
- private OutputStreamCallback parseHttpResponse(ProcessContext context,
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger
objectCountHolder) {
+ private OutputStreamCallback parseHttpResponse(final ProcessContext
context, final HttpResponseEntity response, final AtomicInteger total,
+ final Map<String, String>
stateMap) {
return out -> {
try (final JsonParser jsonParser =
JSON_FACTORY.createParser(response.body());
final JsonGenerator jsonGenerator =
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+ boolean isCursorAvailable = false;
+ final String objectType =
context.getProperty(OBJECT_TYPE).getValue();
+ final String cursorKey = String.format(CURSOR_KEY_PATTERN,
objectType);
while (jsonParser.nextToken() != null) {
+ if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
+ .equals("total")) {
+ jsonParser.nextToken();
+ total.set(jsonParser.getIntValue());
+ }
if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
.equals("results")) {
jsonParser.nextToken();
jsonGenerator.copyCurrentStructure(jsonParser);
- objectCountHolder.incrementAndGet();
}
final String fieldName = jsonParser.getCurrentName();
- if (CURSOR_PARAMETER.equals(fieldName)) {
+ if (PAGING_CURSOR.equals(fieldName)) {
+ isCursorAvailable = true;
jsonParser.nextToken();
- Map<String, String> newStateMap = new
HashMap<>(state.toMap());
- newStateMap.put(endpoint, jsonParser.getText());
- updateState(context, newStateMap);
+ stateMap.put(cursorKey, jsonParser.getText());
break;
}
}
+ if (!isCursorAvailable) {
+ stateMap.put(cursorKey, NO_PAGING);
+ }
}
};
}
- HttpUriBuilder getBaseUri(final ProcessContext context) {
+ URI getBaseUri(final ProcessContext context) {
final String path = context.getProperty(OBJECT_TYPE).getValue();
return webClientServiceProvider.getHttpUriBuilder()
.scheme(HTTPS)
.host(API_BASE_URI)
- .encodedPath(path);
+ .encodedPath(path + "/search")
+ .build();
}
- private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri) {
+ private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri, final String filters) {
+ final JsonInputStreamConverter converter = new
JsonInputStreamConverter(filters);
return webClientServiceProvider.getWebClientService()
- .get()
+ .post()
.uri(uri)
.header("Authorization", "Bearer " + accessToken)
+ .header("Content-Type", "application/json")
+ .body(converter.getInputStream(),
OptionalLong.of(converter.getByteSize()))
.retrieve();
}
- private URI createUri(final ProcessContext context, final StateMap state) {
- final String path = context.getProperty(OBJECT_TYPE).getValue();
- final HttpUriBuilder uriBuilder = getBaseUri(context);
+ String createIncrementalFilters(final ProcessContext context, final
Map<String, String> stateMap) {
+ final String limit = context.getProperty(RESULT_LIMIT).getValue();
+ final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+ final HubSpotObjectType hubSpotObjectType =
objectTypeLookupMap.get(objectType);
+ final Long incrDelayMs =
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+ final String startIncrementalKey = String.format("start: %s",
objectType);
+ final String endIncrementalKey = String.format("end: %s", objectType);
Review Comment:
Is it necessary to add the `objectType` in the state? The processor handles
only one object type at a time.
I'd suggest using more descriptive names:
- query_time_window_start
- query_time_window_end
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/IncrementalFieldType.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+public enum IncrementalFieldType {
+ LAST_MODIFIED_DATE("lastmodifieddate"),
+ HS_LAST_MODIFIED_DATE("hs_lastmodifieddate");
+
+ final String value;
+
+ IncrementalFieldType(String value) {
+ this.value = value;
+ }
+
+ String getValue() {
+ return value;
+ }
Review Comment:
```suggestion
public String getValue() {
return value;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]