pvillard31 commented on code in PR #11090: URL: https://github.com/apache/nifi/pull/11090#discussion_r3027652341
########## nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowpipeIngestClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.snowflake.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.processors.snowflake.snowpipe.InsertFiles; +import org.apache.nifi.processors.snowflake.snowpipe.InsertReport; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.Objects; +import java.util.UUID; + +/** + * Client for Snowflake Snowpipe REST API using Java HttpClient + */ +class SnowpipeIngestClient implements AutoCloseable { + + private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(10); + + private static final String INSERT_FILES_PATH = "/v1/data/pipes/%s/insertFiles"; + + private static final String INSERT_REPORT_PATH = "/v1/data/pipes/%s/insertReport"; + + private static final String REQUEST_ID_FORMAT = "%s?requestId=%s"; + + private static final String AUTHORIZATION_HEADER = "Authorization"; + + private static final String CONTENT_TYPE_HEADER = "Content-Type"; + + private static final String ACCEPT_HEADER = "Accept"; + + private static final String APPLICATION_JSON = "application/json"; + + private static final ObjectMapper objectMapper = new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); Review Comment: Static final fields should follow UPPER_SNAKE_CASE convention: OBJECT_MAPPER ########## nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/SnowpipeIngestClient.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.snowflake.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.processors.snowflake.snowpipe.InsertFiles; +import org.apache.nifi.processors.snowflake.snowpipe.InsertReport; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.Objects; +import java.util.UUID; + +/** + * Client for Snowflake Snowpipe REST API using Java HttpClient + */ +class SnowpipeIngestClient implements AutoCloseable { + + private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(10); + + private static final String INSERT_FILES_PATH = "/v1/data/pipes/%s/insertFiles"; + + private static final String INSERT_REPORT_PATH = "/v1/data/pipes/%s/insertReport"; + + private static final String REQUEST_ID_FORMAT = "%s?requestId=%s"; + + private static final String AUTHORIZATION_HEADER = "Authorization"; + + private static final String CONTENT_TYPE_HEADER = "Content-Type"; + + private static final String ACCEPT_HEADER = "Accept"; + + private static final String APPLICATION_JSON = "application/json"; + + private static final ObjectMapper objectMapper = new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + + private final HttpClient httpClient; + + private final URI insertFilesUri; + + private final URI insertReportUri; + + private final RSAKeyAuthorizationProvider authorizationProvider; + + /** + * Snowpipe Ingest Client with required properties + * + * @param baseUri Base URI for the Snowpipe REST API + * @param pipeName Fully qualified pipe name + * @param authorizationProvider RSA Key Authorization Provider for JWT authentication + */ + SnowpipeIngestClient( + final URI baseUri, + final String pipeName, + final RSAKeyAuthorizationProvider authorizationProvider + ) { + Objects.requireNonNull(baseUri, "Base URI required"); + Objects.requireNonNull(pipeName, "Pipe Name required"); + Objects.requireNonNull(authorizationProvider, "Authorization Provider required"); + + final String insertFilesPath = INSERT_FILES_PATH.formatted(pipeName); + this.insertFilesUri = baseUri.resolve(insertFilesPath); + + final String insertReportPath = INSERT_REPORT_PATH.formatted(pipeName); + this.insertReportUri = baseUri.resolve(insertReportPath); + + this.authorizationProvider = authorizationProvider; + this.httpClient = HttpClient.newBuilder() + .connectTimeout(CONNECT_TIMEOUT) + .build(); + } + + @Override + public void close() { + httpClient.close(); + } + + /** + * Submit staged files for ingestion via the insertFiles REST API endpoint + * + * @param insertFiles Files to ingest + * @throws SnowpipeResponseException on HTTP error responses or transport failures + */ + void insertFiles(final InsertFiles insertFiles) { + Objects.requireNonNull(insertFiles, "Insert Files required"); + + final String requestBody = serializeInsertFiles(insertFiles); + final URI requestUri = appendRequestId(insertFilesUri); + final String authorization = authorizationProvider.getRequestAuthorization().authorization(); + + final HttpRequest request = HttpRequest.newBuilder() + .uri(requestUri) + .header(AUTHORIZATION_HEADER, authorization) + .header(CONTENT_TYPE_HEADER, APPLICATION_JSON) + .POST(HttpRequest.BodyPublishers.ofString(requestBody)) + .build(); + + final HttpResponse<String> response = sendRequest(request); + final int statusCode = response.statusCode(); + if (statusCode != HttpURLConnection.HTTP_OK) { + final String message = "Insert Files POST [%s] HTTP %d Response [%s]".formatted(requestUri, statusCode, response.body()); + throw new SnowpipeResponseException(message); + } + } + + /** + * Retrieve the insert report for recently ingested files via the insertReport REST API endpoint + * + * @return Insert report containing file entries with ingestion status + * @throws SnowpipeResponseException on HTTP error responses or transport failures + */ + InsertReport getInsertReport() { + final URI requestUri = appendRequestId(insertReportUri); + final String authorization = authorizationProvider.getRequestAuthorization().authorization(); + + final HttpRequest request = HttpRequest.newBuilder() + .uri(requestUri) + .header(AUTHORIZATION_HEADER, authorization) + .header(ACCEPT_HEADER, APPLICATION_JSON) + .GET() + .build(); + + final HttpResponse<String> response = sendRequest(request); + final int statusCode = response.statusCode(); + final String responseBody = response.body(); + if (statusCode == HttpURLConnection.HTTP_OK) { + try { + return objectMapper.readValue(responseBody, InsertReport.class); + } catch (final JsonProcessingException e) { + final String message = "Insert Report [%s] response parsing failed [%s]".formatted(requestUri, responseBody); + throw new SnowpipeResponseException(message, e); + } + } else { + final String message = "Insert Report failed GET [%s] HTTP %d Response [%s]".formatted(requestUri, statusCode, responseBody); + throw new SnowpipeResponseException(message); + } + } + + private String serializeInsertFiles(final InsertFiles insertFiles) { + try { + return objectMapper.writeValueAsString(insertFiles); + } catch (final IOException e) { Review Comment: JsonProcessingException instead, no? ########## nifi-extension-bundles/nifi-snowflake-bundle/nifi-snowflake-services/src/main/java/org/apache/nifi/snowflake/service/RSAKeyAuthorizationProvider.java: ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.snowflake.service; + +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.security.KeyFactory; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.security.PublicKey; +import java.security.Signature; +import java.security.SignatureException; +import java.security.interfaces.RSAPrivateCrtKey; +import java.security.spec.RSAPublicKeySpec; +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.Base64; +import java.util.Locale; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +/** + * RSA Private Key Authorization Provider implementing Snowflake JSON Web Token with RS256 + */ +class RSAKeyAuthorizationProvider { + + private static final String FINGERPRINT_ALGORITHM = "SHA-256"; + + private static final String FINGERPRINT_FORMAT = "SHA256:%s"; + + private static final String SIGNING_ALGORITHM = "SHA256withRSA"; + + private static final String QUALIFIED_CLAIM = "%s.%s"; + + private static final String TOKEN_HEADER = "{\"alg\":\"RS256\"}"; + + private static final String TOKEN_PAYLOAD_FORMAT = """ + {"iss":"%s","sub":"%s","iat":%d,"exp":%d}"""; + + private static final String BEARER_TOKEN_AUTHORIZATION_FORMAT = "Bearer %s.%s"; + + private static final Duration EXPIRATION = Duration.ofMinutes(59); + + private static final Duration REFRESH_EXPIRATION = EXPIRATION.minusMinutes(5); + + private static final Base64.Encoder urlEncoder = Base64.getUrlEncoder().withoutPadding(); + + private static final Base64.Encoder fingerprintEncoder = Base64.getEncoder(); Review Comment: Same here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
