Lehel44 commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r979479849
##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final
ProcessContext context, final HttpR
}
}
- private OutputStreamCallback parseHttpResponse(ProcessContext context,
String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger
objectCountHolder) {
+ private OutputStreamCallback parseHttpResponse(final ProcessContext
context, final HttpResponseEntity response, final AtomicInteger total,
+ final Map<String, String>
stateMap) {
return out -> {
try (final JsonParser jsonParser =
JSON_FACTORY.createParser(response.body());
final JsonGenerator jsonGenerator =
JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+ boolean isCursorAvailable = false;
+ final String objectType =
context.getProperty(OBJECT_TYPE).getValue();
+ final String cursorKey = String.format(CURSOR_KEY_PATTERN,
objectType);
while (jsonParser.nextToken() != null) {
+ if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
+ .equals("total")) {
+ jsonParser.nextToken();
+ total.set(jsonParser.getIntValue());
+ }
if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME
&& jsonParser.getCurrentName()
.equals("results")) {
jsonParser.nextToken();
jsonGenerator.copyCurrentStructure(jsonParser);
- objectCountHolder.incrementAndGet();
}
final String fieldName = jsonParser.getCurrentName();
- if (CURSOR_PARAMETER.equals(fieldName)) {
+ if (PAGING_CURSOR.equals(fieldName)) {
+ isCursorAvailable = true;
jsonParser.nextToken();
- Map<String, String> newStateMap = new
HashMap<>(state.toMap());
- newStateMap.put(endpoint, jsonParser.getText());
- updateState(context, newStateMap);
+ stateMap.put(cursorKey, jsonParser.getText());
break;
}
}
+ if (!isCursorAvailable) {
+ stateMap.put(cursorKey, NO_PAGING);
+ }
}
};
}
- HttpUriBuilder getBaseUri(final ProcessContext context) {
+ URI getBaseUri(final ProcessContext context) {
final String path = context.getProperty(OBJECT_TYPE).getValue();
return webClientServiceProvider.getHttpUriBuilder()
.scheme(HTTPS)
.host(API_BASE_URI)
- .encodedPath(path);
+ .encodedPath(path + "/search")
+ .build();
}
- private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri) {
- return webClientServiceProvider.getWebClientService()
- .get()
- .uri(uri)
- .header("Authorization", "Bearer " + accessToken)
- .retrieve();
+ private HttpResponseEntity getHttpResponseEntity(final String accessToken,
final URI uri, final String filters) {
+ final InputStream inputStream = IOUtils.toInputStream(filters,
StandardCharsets.UTF_8);
+ try {
+ return webClientServiceProvider.getWebClientService()
+ .post()
+ .uri(uri)
+ .header("Authorization", "Bearer " + accessToken)
+ .header("Content-Type", "application/json")
+ .body(inputStream,
OptionalLong.of(inputStream.available()))
+ .retrieve();
+ } catch (IOException e) {
+ throw new ProcessException("Could not transform incremental
filters to input stream", e);
+ }
}
- private URI createUri(final ProcessContext context, final StateMap state) {
- final String path = context.getProperty(OBJECT_TYPE).getValue();
- final HttpUriBuilder uriBuilder = getBaseUri(context);
+ private String createIncrementalFilters(final ProcessContext context,
final Map<String, String> stateMap) {
+ final String limit = context.getProperty(RESULT_LIMIT).getValue();
+ final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+ final HubSpotObjectType hubSpotObjectType =
OBJECT_TYPE_LOOKUP_MAP.get(objectType);
+ final Long incrDelayMs =
context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+ final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
- final boolean isLimitSet =
context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
- if (isLimitSet) {
- final String limit = context.getProperty(RESULT_LIMIT).getValue();
- uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+ final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+ root.put("limit", limit);
+
+ final String cursor = stateMap.get(cursorKey);
+ if (cursor != null && !NO_PAGING.equals(cursor)) {
+ root.put(PAGING_CURSOR, stateMap.get(cursorKey));
}
+ final boolean isIncremental =
context.getProperty(IS_INCREMENTAL).asBoolean();
+ if (isIncremental) {
+ final String initialStartTimeValue =
context.getProperty(INCREMENTAL_INITIAL_START_TIME).getValue();
+ String initialStartTimeEpoch =
getInitialStartTimeEpoch(initialStartTimeValue);
+ final String hubspotSpecificIncrementalFieldName =
hubSpotObjectType.getLastModifiedDateType().getValue();
+ final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY);
+ final String lastEndTime =
stateMap.getOrDefault(END_INCREMENTAL_KEY, initialStartTimeEpoch);
+
+ final String currentStartTime;
+ final String currentEndTime;
+
+ if (cursor != null && !NO_PAGING.equals(cursor)) {
+ currentStartTime = lastStartTime;
+ // lastEndTime can be null if incremental loading was turned
off beforehand
+ currentEndTime = lastEndTime != null ? lastEndTime :
String.valueOf(getCurrentEpochTime());
+ } else {
+ currentStartTime = lastEndTime;
+ final long delayedCurrentEndTime = incrDelayMs != null ?
getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime();
+ currentEndTime = String.valueOf(delayedCurrentEndTime);
+
+ stateMap.put(START_INCREMENTAL_KEY, currentStartTime);
+ stateMap.put(END_INCREMENTAL_KEY, currentEndTime);
+ }
+
+ final ArrayNode filters = OBJECT_MAPPER.createArrayNode();
- final String cursor = state.get(path);
- if (cursor != null) {
- uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
+ if (currentStartTime != null) {
+ final ObjectNode greaterThanFilterNode =
OBJECT_MAPPER.createObjectNode();
+ greaterThanFilterNode.put("propertyName",
hubspotSpecificIncrementalFieldName);
+ greaterThanFilterNode.put("operator", "GTE");
+ greaterThanFilterNode.put("value", currentStartTime);
+ filters.add(greaterThanFilterNode);
+ }
+
+ final ObjectNode lessThanFilterNode =
OBJECT_MAPPER.createObjectNode();
+ lessThanFilterNode.put("propertyName",
hubspotSpecificIncrementalFieldName);
+ lessThanFilterNode.put("operator", "LT");
+ lessThanFilterNode.put("value", currentEndTime);
+ filters.add(lessThanFilterNode);
+
+ root.set("filters", filters);
}
- return uriBuilder.build();
+ return root.toString();
}
- private StateMap getStateMap(final ProcessContext context) {
+ private String getInitialStartTimeEpoch(String initialStartTimeValue) {
+ if (initialStartTimeValue != null) {
+ final TemporalAccessor initialDateTime =
DateTimeFormatter.ISO_DATE_TIME.parse(initialStartTimeValue);
+ return
String.valueOf(Instant.from(initialDateTime).toEpochMilli());
+ }
+ return null;
+ }
+
+ long getCurrentEpochTime() {
+ return Instant.now().toEpochMilli();
+ }
+
+ private Map<String, String> getStateMap(final ProcessSession session) {
final StateMap stateMap;
try {
- stateMap = context.getStateManager().getState(Scope.CLUSTER);
+ stateMap = session.getState(Scope.CLUSTER);
} catch (IOException e) {
throw new ProcessException("State retrieval failed", e);
}
- return stateMap;
+ return new HashMap<>(stateMap.toMap());
}
- private void updateState(ProcessContext context, Map<String, String>
newState) {
+ private void updateState(ProcessSession session, Map<String, String>
newState) {
try {
- context.getStateManager().setState(newState, Scope.CLUSTER);
+ session.setState(newState, Scope.CLUSTER);
} catch (IOException e) {
throw new ProcessException("Page cursor update failed", e);
}
}
+
+ private void clearState(ProcessSession session) {
+ try {
+ session.clearState(Scope.CLUSTER);
Review Comment:
It removes the items when the session gets commited. I think that's why the
clearState in the beginning of the onTrigger won't work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]