Github user kevdoran commented on a diff in the pull request:
https://github.com/apache/nifi-registry/pull/148#discussion_r236837704
--- Diff:
nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/extension/StandardExtensionService.java
---
@@ -0,0 +1,589 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service.extension;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.ExtensionBundleEntity;
+import org.apache.nifi.registry.db.entity.ExtensionBundleEntityType;
+import org.apache.nifi.registry.db.entity.ExtensionBundleVersionEntity;
+import org.apache.nifi.registry.exception.ResourceNotFoundException;
+import org.apache.nifi.registry.extension.BundleCoordinate;
+import org.apache.nifi.registry.extension.BundleDetails;
+import org.apache.nifi.registry.extension.BundleExtractor;
+import org.apache.nifi.registry.extension.ExtensionBundle;
+import org.apache.nifi.registry.extension.ExtensionBundleContext;
+import
org.apache.nifi.registry.extension.ExtensionBundlePersistenceProvider;
+import org.apache.nifi.registry.extension.ExtensionBundleType;
+import org.apache.nifi.registry.extension.ExtensionBundleVersion;
+import org.apache.nifi.registry.extension.ExtensionBundleVersionDependency;
+import org.apache.nifi.registry.extension.ExtensionBundleVersionMetadata;
+import org.apache.nifi.registry.extension.repo.ExtensionRepoArtifact;
+import org.apache.nifi.registry.extension.repo.ExtensionRepoBucket;
+import org.apache.nifi.registry.extension.repo.ExtensionRepoGroup;
+import org.apache.nifi.registry.extension.repo.ExtensionRepoVersionSummary;
+import org.apache.nifi.registry.properties.NiFiRegistryProperties;
+import
org.apache.nifi.registry.provider.extension.StandardExtensionBundleContext;
+import org.apache.nifi.registry.security.authorization.user.NiFiUserUtils;
+import org.apache.nifi.registry.service.DataModelMapper;
+import org.apache.nifi.registry.service.MetadataService;
+import org.apache.nifi.registry.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.validation.ConstraintViolation;
+import javax.validation.ConstraintViolationException;
+import javax.validation.Validator;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Service
+public class StandardExtensionService implements ExtensionService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StandardExtensionService.class);
+
+ private final MetadataService metadataService;
+ private final Map<ExtensionBundleType, BundleExtractor> extractors;
+ private final ExtensionBundlePersistenceProvider
bundlePersistenceProvider;
+ private final Validator validator;
+ private final File extensionsWorkingDir;
+
+ @Autowired
+ public StandardExtensionService(final MetadataService metadataService,
+ final Map<ExtensionBundleType,
BundleExtractor> extractors,
+ final
ExtensionBundlePersistenceProvider bundlePersistenceProvider,
+ final Validator validator,
+ final NiFiRegistryProperties
properties) {
+ this.metadataService = metadataService;
+ this.extractors = extractors;
+ this.bundlePersistenceProvider = bundlePersistenceProvider;
+ this.validator = validator;
+ this.extensionsWorkingDir =
properties.getExtensionsWorkingDirectory();
+ Validate.notNull(this.metadataService);
+ Validate.notNull(this.extractors);
+ Validate.notNull(this.bundlePersistenceProvider);
+ Validate.notNull(this.validator);
+ Validate.notNull(this.extensionsWorkingDir);
+ }
+
+ private <T> void validate(T t, String invalidMessage) {
+ final Set<ConstraintViolation<T>> violations =
validator.validate(t);
+ if (violations.size() > 0) {
+ throw new ConstraintViolationException(invalidMessage,
violations);
+ }
+ }
+
+ @Override
+ public ExtensionBundleVersion createExtensionBundleVersion(final
String bucketIdentifier, final ExtensionBundleType bundleType,
+ final
InputStream inputStream) throws IOException {
+ if (StringUtils.isBlank(bucketIdentifier)) {
+ throw new IllegalArgumentException("Bucket identifier cannot
be null or blank");
+ }
+
+ if (bundleType == null) {
+ throw new IllegalArgumentException("Bundle type cannot be
null");
+ }
+
+ if (inputStream == null) {
+ throw new IllegalArgumentException("Extension bundle input
stream cannot be null");
+ }
+
+ if (!extractors.containsKey(bundleType)) {
+ throw new IllegalArgumentException("No metadata extractor is
registered for bundle-type: " + bundleType);
+ }
+
+ // ensure the bucket exists
+ final BucketEntity existingBucket =
metadataService.getBucketById(bucketIdentifier);
+ if (existingBucket == null) {
+ LOGGER.warn("The specified bucket id [{}] does not exist.",
bucketIdentifier);
+ throw new ResourceNotFoundException("The specified bucket ID
does not exist in this registry.");
+ }
+
+ // ensure the extensions directory exists and we can read and
write to it
+
FileUtils.ensureDirectoryExistAndCanReadAndWrite(extensionsWorkingDir);
+
+ final String extensionWorkingFilename =
UUID.randomUUID().toString();
+ final File extensionWorkingFile = new File(extensionsWorkingDir,
extensionWorkingFilename);
+ LOGGER.debug("Writing bundle contents to working directory at {}",
new Object[]{extensionWorkingFile.getAbsolutePath()});
+
+ try {
+ // write the contents of the input stream to a temporary file
in the extensions working directory
+ final MessageDigest sha256Digest =
DigestUtils.getSha256Digest();
+ try (final DigestInputStream digestInputStream = new
DigestInputStream(inputStream, sha256Digest);
+ final OutputStream out = new
FileOutputStream(extensionWorkingFile)) {
+ IOUtils.copy(digestInputStream, out);
+ }
+
+ final String sha256Hex =
Hex.encodeHexString(sha256Digest.digest());
+
+ // extract the details of the bundle from the temp file in the
working directory
+ final BundleDetails bundleDetails;
+ try (final InputStream in = new
FileInputStream(extensionWorkingFile)) {
+ final BundleExtractor extractor =
extractors.get(bundleType);
+ bundleDetails = extractor.extract(in);
+ }
+
+ final BundleCoordinate bundleCoordinate =
bundleDetails.getBundleCoordinate();
+ final BundleCoordinate dependencyCoordinate =
bundleDetails.getDependencyBundleCoordinate();
+
+ final String groupId = bundleCoordinate.getGroupId();
+ final String artifactId = bundleCoordinate.getArtifactId();
+ final String version = bundleCoordinate.getVersion();
+ LOGGER.debug("Extracted bundle details - '{}' - '{}' - '{}'",
new Object[]{groupId, artifactId, version});
+
+ // a bundle with the same group, artifact, and version can
exist in multiple buckets, but only if it contains the same binary content,
+ // we can determine that by comparing the SHA-256 digest of
the incoming bundle against existing bundles with the same group, artifact,
version
+ final List<ExtensionBundleVersionEntity> allExistingVersions =
metadataService.getExtensionBundleVersionsGlobal(groupId, artifactId, version);
+ for (final ExtensionBundleVersionEntity existingVersionEntity
: allExistingVersions) {
+ if
(!existingVersionEntity.getSha256Hex().equals(sha256Hex)) {
+ throw new IllegalStateException("Found existing
extension bundle with same group, artifact, and version, but different SHA-256
check-sum");
+ }
+ }
+
+ // get the existing extension bundle entity, or create a new
one if one does not exist in the bucket with the group + artifact
+ final ExtensionBundleEntity extensionBundle =
getOrCreateExtensionBundle(bucketIdentifier, groupId, artifactId, bundleType);
+
+ // ensure there isn't already a version of the bundle with the
same version
+ final ExtensionBundleVersionEntity existingVersion =
metadataService.getExtensionBundleVersion(bucketIdentifier, groupId,
artifactId, version);
+ if (existingVersion != null) {
+ LOGGER.warn("The specified version [{}] already exists for
extension bundle [{}].", new Object[]{version, extensionBundle.getId()});
+ throw new IllegalStateException("The specified version
already exists for the given extension bundle");
+ }
+
+ // create the bundle version in the metadata db
+ final String userIdentity =
NiFiUserUtils.getNiFiUserIdentity();
+ final long bundleCreatedTime =
extensionBundle.getCreated().getTime();
+
+ final ExtensionBundleVersionMetadata versionMetadata = new
ExtensionBundleVersionMetadata();
+ versionMetadata.setId(UUID.randomUUID().toString());
+ versionMetadata.setExtensionBundleId(extensionBundle.getId());
+ versionMetadata.setBucketId(bucketIdentifier);
+ versionMetadata.setVersion(version);
+ versionMetadata.setTimestamp(bundleCreatedTime);
+ versionMetadata.setAuthor(userIdentity);
+ versionMetadata.setSha256Hex(sha256Hex);
+
+ if (dependencyCoordinate != null) {
+ final ExtensionBundleVersionDependency versionDependency =
new ExtensionBundleVersionDependency();
+
versionDependency.setGroupId(dependencyCoordinate.getGroupId());
+
versionDependency.setArtifactId(dependencyCoordinate.getArtifactId());
+
versionDependency.setVersion(dependencyCoordinate.getVersion());
+ versionMetadata.setDependency(versionDependency);
+ }
+
+ validate(versionMetadata, "Cannot create extension bundle
version");
+
+ final ExtensionBundleVersionEntity versionEntity =
DataModelMapper.map(versionMetadata);
+ metadataService.createExtensionBundleVersion(versionEntity);
+
+ // persist the content of the bundle to the persistence
provider
+ final ExtensionBundleContext context = new
StandardExtensionBundleContext.Builder()
+ .bundleType(getProviderBundleType(bundleType))
+ .bucketId(existingBucket.getId())
+ .bucketName(existingBucket.getName())
+ .bundleId(extensionBundle.getId())
+ .bundleGroupId(extensionBundle.getGroupId())
+ .bundleArtifactId(extensionBundle.getArtifactId())
+ .bundleVersion(versionMetadata.getVersion())
+ .author(versionMetadata.getAuthor())
+ .timestamp(versionMetadata.getTimestamp())
+ .build();
+
+ try (final InputStream in = new
FileInputStream(extensionWorkingFile);
+ final InputStream bufIn = new BufferedInputStream(in)) {
+ bundlePersistenceProvider.saveBundleVersion(context,
bufIn);
+ LOGGER.debug("Bundle saved to persistence provider - '{}'
- '{}' - '{}'",
+ new Object[]{groupId, artifactId, version});
+ }
+
+ // get the updated extension bundle so it contains the correct
version count
+ final ExtensionBundleEntity updatedBundle =
metadataService.getExtensionBundle(bucketIdentifier, groupId, artifactId);
+
+ // create the full ExtensionBundleVersion instance to return
+ final ExtensionBundleVersion extensionBundleVersion = new
ExtensionBundleVersion();
+ extensionBundleVersion.setVersionMetadata(versionMetadata);
+
extensionBundleVersion.setExtensionBundle(DataModelMapper.map(existingBucket,
updatedBundle));
+
extensionBundleVersion.setBucket(DataModelMapper.map(existingBucket));
+ return extensionBundleVersion;
+
+ } finally {
+ if (extensionWorkingFile.exists()) {
+ try {
+ extensionWorkingFile.delete();
+ } catch (Exception e) {
+ LOGGER.warn("Error removing temporary extension bundle
file at {}",
+ new
Object[]{extensionWorkingFile.getAbsolutePath()});
+ }
+ }
+ }
+ }
+
+ private ExtensionBundleEntity getOrCreateExtensionBundle(final String
bucketId, final String groupId,
+ final String
artifactId, final ExtensionBundleType bundleType) {
+ ExtensionBundleEntity existingBundleEntity =
metadataService.getExtensionBundle(bucketId, groupId, artifactId);
+ if (existingBundleEntity == null) {
+ final ExtensionBundle bundle = new ExtensionBundle();
+ bundle.setIdentifier(UUID.randomUUID().toString());
+ bundle.setBucketIdentifier(bucketId);
+ bundle.setName(groupId + " - " + artifactId);
--- End diff --
Don't know how others feel about this, but personally I would have a slight
preference for using the Maven coordinate style here for the inferred bundle
name, i.e.:
bundle.setName(groupId + ":" + artifactId);
---