Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi-registry/pull/148#discussion_r236849819
  
    --- Diff: 
nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/extension/StandardExtensionService.java
 ---
    @@ -0,0 +1,589 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.registry.service.extension;
    +
    +import org.apache.commons.codec.binary.Hex;
    +import org.apache.commons.codec.digest.DigestUtils;
    +import org.apache.commons.io.IOUtils;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.commons.lang3.Validate;
    +import org.apache.nifi.registry.bucket.Bucket;
    +import org.apache.nifi.registry.db.entity.BucketEntity;
    +import org.apache.nifi.registry.db.entity.ExtensionBundleEntity;
    +import org.apache.nifi.registry.db.entity.ExtensionBundleEntityType;
    +import org.apache.nifi.registry.db.entity.ExtensionBundleVersionEntity;
    +import org.apache.nifi.registry.exception.ResourceNotFoundException;
    +import org.apache.nifi.registry.extension.BundleCoordinate;
    +import org.apache.nifi.registry.extension.BundleDetails;
    +import org.apache.nifi.registry.extension.BundleExtractor;
    +import org.apache.nifi.registry.extension.ExtensionBundle;
    +import org.apache.nifi.registry.extension.ExtensionBundleContext;
    +import 
org.apache.nifi.registry.extension.ExtensionBundlePersistenceProvider;
    +import org.apache.nifi.registry.extension.ExtensionBundleType;
    +import org.apache.nifi.registry.extension.ExtensionBundleVersion;
    +import org.apache.nifi.registry.extension.ExtensionBundleVersionDependency;
    +import org.apache.nifi.registry.extension.ExtensionBundleVersionMetadata;
    +import org.apache.nifi.registry.extension.repo.ExtensionRepoArtifact;
    +import org.apache.nifi.registry.extension.repo.ExtensionRepoBucket;
    +import org.apache.nifi.registry.extension.repo.ExtensionRepoGroup;
    +import org.apache.nifi.registry.extension.repo.ExtensionRepoVersionSummary;
    +import org.apache.nifi.registry.properties.NiFiRegistryProperties;
    +import 
org.apache.nifi.registry.provider.extension.StandardExtensionBundleContext;
    +import org.apache.nifi.registry.security.authorization.user.NiFiUserUtils;
    +import org.apache.nifi.registry.service.DataModelMapper;
    +import org.apache.nifi.registry.service.MetadataService;
    +import org.apache.nifi.registry.util.FileUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.springframework.beans.factory.annotation.Autowired;
    +import org.springframework.stereotype.Service;
    +
    +import javax.validation.ConstraintViolation;
    +import javax.validation.ConstraintViolationException;
    +import javax.validation.Validator;
    +import java.io.BufferedInputStream;
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.security.DigestInputStream;
    +import java.security.MessageDigest;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +import java.util.UUID;
    +import java.util.stream.Collectors;
    +
    +@Service
    +public class StandardExtensionService implements ExtensionService {
    +
    +    private static final Logger LOGGER = 
LoggerFactory.getLogger(StandardExtensionService.class);
    +
    +    private final MetadataService metadataService;
    +    private final Map<ExtensionBundleType, BundleExtractor> extractors;
    +    private final ExtensionBundlePersistenceProvider 
bundlePersistenceProvider;
    +    private final Validator validator;
    +    private final File extensionsWorkingDir;
    +
    +    @Autowired
    +    public StandardExtensionService(final MetadataService metadataService,
    +                                    final Map<ExtensionBundleType, 
BundleExtractor> extractors,
    +                                    final 
ExtensionBundlePersistenceProvider bundlePersistenceProvider,
    +                                    final Validator validator,
    +                                    final NiFiRegistryProperties 
properties) {
    +        this.metadataService = metadataService;
    +        this.extractors = extractors;
    +        this.bundlePersistenceProvider = bundlePersistenceProvider;
    +        this.validator = validator;
    +        this.extensionsWorkingDir = 
properties.getExtensionsWorkingDirectory();
    +        Validate.notNull(this.metadataService);
    +        Validate.notNull(this.extractors);
    +        Validate.notNull(this.bundlePersistenceProvider);
    +        Validate.notNull(this.validator);
    +        Validate.notNull(this.extensionsWorkingDir);
    +    }
    +
    +    private <T>  void validate(T t, String invalidMessage) {
    +        final Set<ConstraintViolation<T>> violations = 
validator.validate(t);
    +        if (violations.size() > 0) {
    +            throw new ConstraintViolationException(invalidMessage, 
violations);
    +        }
    +    }
    +
    +    @Override
    +    public ExtensionBundleVersion createExtensionBundleVersion(final 
String bucketIdentifier, final ExtensionBundleType bundleType,
    +                                                               final 
InputStream inputStream) throws IOException {
    +        if (StringUtils.isBlank(bucketIdentifier)) {
    +            throw new IllegalArgumentException("Bucket identifier cannot 
be null or blank");
    +        }
    +
    +        if (bundleType == null) {
    +            throw new IllegalArgumentException("Bundle type cannot be 
null");
    +        }
    +
    +        if (inputStream == null) {
    +            throw new IllegalArgumentException("Extension bundle input 
stream cannot be null");
    +        }
    +
    +        if (!extractors.containsKey(bundleType)) {
    +            throw new IllegalArgumentException("No metadata extractor is 
registered for bundle-type: " + bundleType);
    +        }
    +
    +        // ensure the bucket exists
    +        final BucketEntity existingBucket = 
metadataService.getBucketById(bucketIdentifier);
    +        if (existingBucket == null) {
    +            LOGGER.warn("The specified bucket id [{}] does not exist.", 
bucketIdentifier);
    +            throw new ResourceNotFoundException("The specified bucket ID 
does not exist in this registry.");
    +        }
    +
    +        // ensure the extensions directory exists and we can read and 
write to it
    +        
FileUtils.ensureDirectoryExistAndCanReadAndWrite(extensionsWorkingDir);
    +
    +        final String extensionWorkingFilename = 
UUID.randomUUID().toString();
    +        final File extensionWorkingFile = new File(extensionsWorkingDir, 
extensionWorkingFilename);
    +        LOGGER.debug("Writing bundle contents to working directory at {}", 
new Object[]{extensionWorkingFile.getAbsolutePath()});
    +
    +        try {
    +            // write the contents of the input stream to a temporary file 
in the extensions working directory
    +            final MessageDigest sha256Digest = 
DigestUtils.getSha256Digest();
    +            try (final DigestInputStream digestInputStream = new 
DigestInputStream(inputStream, sha256Digest);
    +                 final OutputStream out = new 
FileOutputStream(extensionWorkingFile)) {
    +                IOUtils.copy(digestInputStream, out);
    +            }
    +
    +            final String sha256Hex = 
Hex.encodeHexString(sha256Digest.digest());
    +
    +            // extract the details of the bundle from the temp file in the 
working directory
    +            final BundleDetails bundleDetails;
    +            try (final InputStream in = new 
FileInputStream(extensionWorkingFile)) {
    +                final BundleExtractor extractor = 
extractors.get(bundleType);
    +                bundleDetails = extractor.extract(in);
    +            }
    +
    +            final BundleCoordinate bundleCoordinate = 
bundleDetails.getBundleCoordinate();
    +            final BundleCoordinate dependencyCoordinate = 
bundleDetails.getDependencyBundleCoordinate();
    +
    +            final String groupId = bundleCoordinate.getGroupId();
    +            final String artifactId = bundleCoordinate.getArtifactId();
    +            final String version = bundleCoordinate.getVersion();
    +            LOGGER.debug("Extracted bundle details - '{}' - '{}' - '{}'", 
new Object[]{groupId, artifactId, version});
    +
    +            // a bundle with the same group, artifact, and version can 
exist in multiple buckets, but only if it contains the same binary content,
    +            // we can determine that by comparing the SHA-256 digest of 
the incoming bundle against existing bundles with the same group, artifact, 
version
    +            final List<ExtensionBundleVersionEntity> allExistingVersions = 
metadataService.getExtensionBundleVersionsGlobal(groupId, artifactId, version);
    +            for (final ExtensionBundleVersionEntity existingVersionEntity 
: allExistingVersions) {
    +                if 
(!existingVersionEntity.getSha256Hex().equals(sha256Hex)) {
    +                    throw new IllegalStateException("Found existing 
extension bundle with same group, artifact, and version, but different SHA-256 
check-sum");
    +                }
    +            }
    +
    +            // get the existing extension bundle entity, or create a new 
one if one does not exist in the bucket with the group + artifact
    +            final ExtensionBundleEntity extensionBundle = 
getOrCreateExtensionBundle(bucketIdentifier, groupId, artifactId, bundleType);
    +
    +            // ensure there isn't already a version of the bundle with the 
same version
    +            final ExtensionBundleVersionEntity existingVersion = 
metadataService.getExtensionBundleVersion(bucketIdentifier, groupId, 
artifactId, version);
    +            if (existingVersion != null) {
    +                LOGGER.warn("The specified version [{}] already exists for 
extension bundle [{}].", new Object[]{version, extensionBundle.getId()});
    +                throw new IllegalStateException("The specified version 
already exists for the given extension bundle");
    +            }
    +
    +            // create the bundle version in the metadata db
    +            final String userIdentity = 
NiFiUserUtils.getNiFiUserIdentity();
    +            final long bundleCreatedTime = 
extensionBundle.getCreated().getTime();
    +
    +            final ExtensionBundleVersionMetadata versionMetadata = new 
ExtensionBundleVersionMetadata();
    +            versionMetadata.setId(UUID.randomUUID().toString());
    +            versionMetadata.setExtensionBundleId(extensionBundle.getId());
    +            versionMetadata.setBucketId(bucketIdentifier);
    +            versionMetadata.setVersion(version);
    +            versionMetadata.setTimestamp(bundleCreatedTime);
    +            versionMetadata.setAuthor(userIdentity);
    +            versionMetadata.setSha256Hex(sha256Hex);
    +
    +            if (dependencyCoordinate != null) {
    +                final ExtensionBundleVersionDependency versionDependency = 
new ExtensionBundleVersionDependency();
    +                
versionDependency.setGroupId(dependencyCoordinate.getGroupId());
    +                
versionDependency.setArtifactId(dependencyCoordinate.getArtifactId());
    +                
versionDependency.setVersion(dependencyCoordinate.getVersion());
    +                versionMetadata.setDependency(versionDependency);
    +            }
    +
    +            validate(versionMetadata, "Cannot create extension bundle 
version");
    +
    +            final ExtensionBundleVersionEntity versionEntity = 
DataModelMapper.map(versionMetadata);
    +            metadataService.createExtensionBundleVersion(versionEntity);
    +
    +            // persist the content of the bundle to the persistence 
provider
    +            final ExtensionBundleContext context = new 
StandardExtensionBundleContext.Builder()
    +                    .bundleType(getProviderBundleType(bundleType))
    +                    .bucketId(existingBucket.getId())
    +                    .bucketName(existingBucket.getName())
    +                    .bundleId(extensionBundle.getId())
    +                    .bundleGroupId(extensionBundle.getGroupId())
    +                    .bundleArtifactId(extensionBundle.getArtifactId())
    +                    .bundleVersion(versionMetadata.getVersion())
    +                    .author(versionMetadata.getAuthor())
    +                    .timestamp(versionMetadata.getTimestamp())
    +                    .build();
    +
    +            try (final InputStream in = new 
FileInputStream(extensionWorkingFile);
    +                 final InputStream bufIn = new BufferedInputStream(in)) {
    +                bundlePersistenceProvider.saveBundleVersion(context, 
bufIn);
    +                LOGGER.debug("Bundle saved to persistence provider - '{}' 
- '{}' - '{}'",
    +                        new Object[]{groupId, artifactId, version});
    +            }
    +
    +            // get the updated extension bundle so it contains the correct 
version count
    +            final ExtensionBundleEntity updatedBundle = 
metadataService.getExtensionBundle(bucketIdentifier, groupId, artifactId);
    +
    +            // create the full ExtensionBundleVersion instance to return
    +            final ExtensionBundleVersion extensionBundleVersion = new 
ExtensionBundleVersion();
    +            extensionBundleVersion.setVersionMetadata(versionMetadata);
    +            
extensionBundleVersion.setExtensionBundle(DataModelMapper.map(existingBucket, 
updatedBundle));
    +            
extensionBundleVersion.setBucket(DataModelMapper.map(existingBucket));
    +            return extensionBundleVersion;
    +
    +        } finally {
    +            if (extensionWorkingFile.exists()) {
    +                try {
    +                    extensionWorkingFile.delete();
    +                } catch (Exception e) {
    +                    LOGGER.warn("Error removing temporary extension bundle 
file at {}",
    +                            new 
Object[]{extensionWorkingFile.getAbsolutePath()});
    +                }
    +            }
    +        }
    +    }
    +
    +    private ExtensionBundleEntity getOrCreateExtensionBundle(final String 
bucketId, final String groupId,
    +                                                             final String 
artifactId, final ExtensionBundleType bundleType) {
    +        ExtensionBundleEntity existingBundleEntity = 
metadataService.getExtensionBundle(bucketId, groupId, artifactId);
    +        if (existingBundleEntity == null) {
    +            final ExtensionBundle bundle = new ExtensionBundle();
    +            bundle.setIdentifier(UUID.randomUUID().toString());
    +            bundle.setBucketIdentifier(bucketId);
    +            bundle.setName(groupId + " - " + artifactId);
    --- End diff --
    
    Makes sense, I'm on-board with that


---

Reply via email to