http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java index 0000000,e516f20..e34e043 mode 000000,100644..100644 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java +++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java @@@ -1,0 -1,541 +1,541 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller; + + import static java.util.Objects.requireNonNull; + + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.HashSet; + import java.util.List; + import java.util.Set; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.atomic.AtomicReference; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReadWriteLock; + import java.util.concurrent.locks.ReentrantReadWriteLock; + + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.connectable.Connectable; + import org.apache.nifi.connectable.ConnectableType; + import org.apache.nifi.connectable.Connection; + import org.apache.nifi.connectable.Funnel; + import org.apache.nifi.connectable.Position; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.groups.ProcessGroup; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessSessionFactory; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.scheduling.SchedulingStrategy; + import org.apache.nifi.util.FormatUtils; + + import org.apache.commons.lang3.builder.ToStringBuilder; + import org.apache.commons.lang3.builder.ToStringStyle; + + public class StandardFunnel implements Funnel { + + public static final long MINIMUM_PENALIZATION_MILLIS = 0L; + public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; + public static final long MINIMUM_YIELD_MILLIS = 0L; + public static final long DEFAULT_YIELD_PERIOD = 1000L; + public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS; + + private final String identifier; + private final Set<Connection> outgoingConnections; + private final List<Connection> incomingConnections; + private final List<Relationship> relationships; + + private final AtomicReference<ProcessGroup> processGroupRef; + private final AtomicReference<Position> position; + private final AtomicReference<String> penalizationPeriod; + private final AtomicReference<String> yieldPeriod; + private final AtomicReference<String> schedulingPeriod; + private final AtomicReference<String> name; + private final AtomicLong schedulingNanos; + private final AtomicBoolean lossTolerant; + private final AtomicReference<ScheduledState> scheduledState; + private final AtomicLong yieldExpiration; + + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + + public StandardFunnel(final String identifier, final ProcessGroup processGroup, final ProcessScheduler scheduler) { + this.identifier = identifier; + this.processGroupRef = new AtomicReference<>(processGroup); + + outgoingConnections = new HashSet<>(); + incomingConnections = new ArrayList<>(); + + final List<Relationship> relationships = new ArrayList<>(); + relationships.add(Relationship.ANONYMOUS); + this.relationships = Collections.unmodifiableList(relationships); + + lossTolerant = new AtomicBoolean(false); + position = new AtomicReference<>(new Position(0D, 0D)); + scheduledState = new AtomicReference<>(ScheduledState.STOPPED); + penalizationPeriod = new AtomicReference<>("30 sec"); + yieldPeriod = new AtomicReference<>("1 sec"); + yieldExpiration = new AtomicLong(0L); + schedulingPeriod = new AtomicReference<>("0 millis"); + schedulingNanos = new AtomicLong(30000); + name = new AtomicReference<>("Funnel"); + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public Collection<Relationship> getRelationships() { + return relationships; + } + + @Override + public Relationship getRelationship(final String relationshipName) { + return (Relationship.ANONYMOUS.getName().equals(relationshipName)) ? Relationship.ANONYMOUS : null; + } + + @Override + public void addConnection(final Connection connection) throws IllegalArgumentException { + writeLock.lock(); + try { + if (!requireNonNull(connection).getSource().equals(this) && !connection.getDestination().equals(this)) { + throw new IllegalArgumentException("Cannot add a connection to a Funnel for which the Funnel is neither the Source nor the Destination"); + } + if (connection.getSource().equals(this) && connection.getDestination().equals(this)) { + throw new IllegalArgumentException("Cannot add a connection from a Funnel back to itself"); + } + + if (connection.getDestination().equals(this)) { + // don't add the connection twice. This may occur if we have a self-loop because we will be told + // to add the connection once because we are the source and again because we are the destination. + if (!incomingConnections.contains(connection)) { + incomingConnections.add(connection); + } + } + + if (connection.getSource().equals(this)) { + // don't add the connection twice. This may occur if we have a self-loop because we will be told + // to add the connection once because we are the source and again because we are the destination. + if (!outgoingConnections.contains(connection)) { + for (final Relationship relationship : connection.getRelationships()) { + if (!relationship.equals(Relationship.ANONYMOUS)) { + throw new IllegalArgumentException("No relationship with name " + relationship + " exists for Funnels"); + } + } + + outgoingConnections.add(connection); + } + } + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean hasIncomingConnection() { + readLock.lock(); + try { + return !incomingConnections.isEmpty(); + } finally { + readLock.unlock(); + } + } + + @Override + public void updateConnection(final Connection connection) throws IllegalStateException { + if (requireNonNull(connection).getSource().equals(this)) { + writeLock.lock(); + try { + if (!outgoingConnections.remove(connection)) { + throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); + } + outgoingConnections.add(connection); + } finally { + writeLock.unlock(); + } + } + + if (connection.getDestination().equals(this)) { + writeLock.lock(); + try { + if (!incomingConnections.remove(connection)) { + throw new IllegalStateException("No Connection with ID " + connection.getIdentifier() + " is currently registered with this Port"); + } + incomingConnections.add(connection); + } finally { + writeLock.unlock(); + } + } + } + + @Override + public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException { + writeLock.lock(); + try { + if (!requireNonNull(connection).getSource().equals(this)) { + final boolean existed = incomingConnections.remove(connection); + if (!existed) { + throw new IllegalStateException("The given connection is not currently registered for this ProcessorNode"); + } + return; + } + + final boolean removed = outgoingConnections.remove(connection); + if (!removed) { + throw new IllegalStateException(connection + " is not registered with " + this); + } + } finally { + writeLock.unlock(); + } + } + + @Override + public Set<Connection> getConnections() { + readLock.lock(); + try { + return Collections.unmodifiableSet(outgoingConnections); + } finally { + readLock.unlock(); + } + } + + @Override + public Set<Connection> getConnections(final Relationship relationship) { + readLock.lock(); + try { + if (relationship.equals(Relationship.ANONYMOUS)) { + return Collections.unmodifiableSet(outgoingConnections); + } + + throw new IllegalArgumentException("No relationship with name " + relationship.getName() + " exists for Funnels"); + } finally { + readLock.unlock(); + } + } + + @Override + public List<Connection> getIncomingConnections() { + readLock.lock(); + try { + return new ArrayList<>(incomingConnections); + } finally { + readLock.unlock(); + } + } + + @Override + public Position getPosition() { + return position.get(); + } + + @Override + public void setPosition(Position position) { + this.position.set(position); + } + + @Override + public String getName() { + return name.get(); + } + + /** + * Throws {@link UnsupportedOperationException} + * + * @param name + */ + @Override + public void setName(final String name) { + throw new UnsupportedOperationException(); + } + + @Override + public String getComments() { + return ""; + } + + @Override + public void setComments(final String comments) { + throw new UnsupportedOperationException(); + } + + @Override + public ProcessGroup getProcessGroup() { + return processGroupRef.get(); + } + + @Override + public void setProcessGroup(final ProcessGroup group) { + processGroupRef.set(group); + } + + @Override + public boolean isAutoTerminated(Relationship relationship) { + return false; + } + + @Override + public boolean isRunning() { + return isRunning(this); + } + + private boolean isRunning(final Connectable source) { + return getScheduledState() == ScheduledState.RUNNING; + } + + @Override + public boolean isTriggerWhenEmpty() { + return false; + } + + @Override + public ScheduledState getScheduledState() { + return scheduledState.get(); + } + + @Override + public boolean isLossTolerant() { + return lossTolerant.get(); + } + + @Override + public void setLossTolerant(final boolean lossTolerant) { + this.lossTolerant.set(lossTolerant); + } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", getIdentifier()).toString(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + final ProcessSession session = sessionFactory.createSession(); + + try { + onTrigger(context, session); + session.commit(); + } catch (final ProcessException e) { + session.rollback(); + throw e; + } catch (final Throwable t) { + session.rollback(); + throw new RuntimeException(t); + } + } + + private void onTrigger(final ProcessContext context, final ProcessSession session) { + readLock.lock(); + try { - Set<Relationship> available = session.getAvailableRelationships(); ++ Set<Relationship> available = context.getAvailableRelationships(); + int transferred = 0; + while (!available.isEmpty()) { + final List<FlowFile> flowFiles = session.get(10); + if (flowFiles.isEmpty()) { + break; + } + + transferred += flowFiles.size(); + session.transfer(flowFiles, Relationship.ANONYMOUS); + session.commit(); - available = session.getAvailableRelationships(); ++ available = context.getAvailableRelationships(); + } + + if (transferred == 0) { + context.yield(); + } + } finally { + readLock.unlock(); + } + } + + /** + * Has no effect + */ + @Override + public void setMaxConcurrentTasks(int taskCount) { + } + + @Override + public int getMaxConcurrentTasks() { + return 1; + } + + @Override + public void setScheduledState(final ScheduledState scheduledState) { + this.scheduledState.set(scheduledState); + } + + @Override + public ConnectableType getConnectableType() { + return ConnectableType.FUNNEL; + } + + @Override + @SuppressWarnings("unchecked") + public Collection<ValidationResult> getValidationErrors() { + return Collections.EMPTY_LIST; + } + + /** + * Updates the amount of time that this processor should avoid being + * scheduled when the processor calls + * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()} + * + * @param yieldPeriod + */ + @Override + public void setYieldPeriod(final String yieldPeriod) { + final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS); + if (yieldMillis < 0) { + throw new IllegalArgumentException("Yield duration must be positive"); + } + this.yieldPeriod.set(yieldPeriod); + } + + /** + * @param schedulingPeriod + */ + @Override + public void setScheduldingPeriod(final String schedulingPeriod) { + final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS); + if (schedulingNanos < 0) { + throw new IllegalArgumentException("Scheduling Period must be positive"); + } + + this.schedulingPeriod.set(schedulingPeriod); + this.schedulingNanos.set(Math.max(MINIMUM_SCHEDULING_NANOS, schedulingNanos)); + } + + @Override + public long getPenalizationPeriod(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(getPenalizationPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); + } + + @Override + public String getPenalizationPeriod() { + return penalizationPeriod.get(); + } + + /** + * Causes the processor not to be scheduled for some period of time. This + * duration can be obtained and set via the + * {@link #getYieldPeriod(TimeUnit)} and + * {@link #setYieldPeriod(long, TimeUnit)} methods. + */ + @Override + public void yield() { + final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS); + yieldExpiration.set(Math.max(yieldExpiration.get(), System.currentTimeMillis() + yieldMillis)); + } + + @Override + public long getYieldExpiration() { + return yieldExpiration.get(); + } + + @Override + public String getSchedulingPeriod() { + return schedulingPeriod.get(); + } + + @Override + public void setPenalizationPeriod(final String penalizationPeriod) { + this.penalizationPeriod.set(penalizationPeriod); + } + + @Override + public String getYieldPeriod() { + return yieldPeriod.get(); + } + + @Override + public long getYieldPeriod(final TimeUnit timeUnit) { + return FormatUtils.getTimeDuration(getYieldPeriod(), timeUnit == null ? DEFAULT_TIME_UNIT : timeUnit); + } + + @Override + public long getSchedulingPeriod(final TimeUnit timeUnit) { + return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS); + } + + @Override + public boolean isSideEffectFree() { + return true; + } + + @Override + public void verifyCanDelete(boolean ignoreConnections) throws IllegalStateException { + if (ignoreConnections) { + return; + } + + readLock.lock(); + try { + for (final Connection connection : outgoingConnections) { + connection.verifyCanDelete(); + } + + for (final Connection connection : incomingConnections) { + if (connection.getSource().equals(this)) { + connection.verifyCanDelete(); + } else { + throw new IllegalStateException(this + " is the destination of another component"); + } + } + } finally { + readLock.unlock(); + } + } + + @Override + public void verifyCanDelete() { + verifyCanDelete(false); + } + + @Override + public void verifyCanStart() { + } + + @Override + public void verifyCanStop() { + } + + @Override + public void verifyCanUpdate() { + } + + @Override + public void verifyCanEnable() { + } + + @Override + public void verifyCanDisable() { + } + + @Override + public SchedulingStrategy getSchedulingStrategy() { + return SchedulingStrategy.TIMER_DRIVEN; + } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java ---------------------------------------------------------------------- diff --cc nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java index 0000000,eae2550..d5dba82 mode 000000,100644..100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java @@@ -1,0 -1,247 +1,242 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.controller.repository; + + import java.io.InputStream; + import java.io.OutputStream; + import java.nio.file.Path; + import java.util.Collection; + import java.util.List; + import java.util.Map; + import java.util.Set; + import java.util.regex.Pattern; + + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.processor.FlowFileFilter; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessSessionFactory; + import org.apache.nifi.processor.QueueSize; + import org.apache.nifi.processor.Relationship; + import org.apache.nifi.processor.io.InputStreamCallback; + import org.apache.nifi.processor.io.OutputStreamCallback; + import org.apache.nifi.processor.io.StreamCallback; + import org.apache.nifi.provenance.ProvenanceReporter; + + public class BatchingSessionFactory implements ProcessSessionFactory { + + private final HighThroughputSession highThroughputSession; + + public BatchingSessionFactory(final StandardProcessSession standardProcessSession) { + highThroughputSession = new HighThroughputSession(standardProcessSession); + } + + @Override + public ProcessSession createSession() { + return highThroughputSession; + } + + private class HighThroughputSession implements ProcessSession { + + private final StandardProcessSession session; + + public HighThroughputSession(final StandardProcessSession session) { + this.session = session; + } + + @Override + public void commit() { + session.checkpoint(); + } + + @Override + public void rollback() { + session.rollback(); + } + + @Override + public void rollback(boolean penalize) { + session.rollback(penalize); + } + + @Override + public void adjustCounter(String name, long delta, boolean immediate) { + session.adjustCounter(name, delta, immediate); + } + + @Override + public FlowFile get() { + return session.get(); + } + + @Override + public List<FlowFile> get(int maxResults) { + return session.get(maxResults); + } + + @Override + public List<FlowFile> get(FlowFileFilter filter) { + return session.get(filter); + } + + @Override + public QueueSize getQueueSize() { + return session.getQueueSize(); + } + + @Override - public Set<Relationship> getAvailableRelationships() { - return session.getAvailableRelationships(); - } - - @Override + public FlowFile create() { + return session.create(); + } + + @Override + public FlowFile create(FlowFile parent) { + return session.create(parent); + } + + @Override + public FlowFile create(Collection<FlowFile> parents) { + return session.create(parents); + } + + @Override + public FlowFile clone(FlowFile example) { + return session.clone(example); + } + + @Override + public FlowFile clone(FlowFile example, long offset, long size) { + return session.clone(example, offset, size); + } + + @Override + public FlowFile penalize(FlowFile flowFile) { + return session.penalize(flowFile); + } + + @Override + public FlowFile putAttribute(FlowFile flowFile, String key, String value) { + return session.putAttribute(flowFile, key, value); + } + + @Override + public FlowFile putAllAttributes(FlowFile flowFile, Map<String, String> attributes) { + return session.putAllAttributes(flowFile, attributes); + } + + @Override + public FlowFile removeAttribute(FlowFile flowFile, String key) { + return session.removeAttribute(flowFile, key); + } + + @Override + public FlowFile removeAllAttributes(FlowFile flowFile, Set<String> keys) { + return session.removeAllAttributes(flowFile, keys); + } + + @Override + public FlowFile removeAllAttributes(FlowFile flowFile, Pattern keyPattern) { + return session.removeAllAttributes(flowFile, keyPattern); + } + + @Override + public void transfer(FlowFile flowFile, Relationship relationship) { + session.transfer(flowFile, relationship); + } + + @Override + public void transfer(FlowFile flowFile) { + session.transfer(flowFile); + } + + @Override + public void transfer(Collection<FlowFile> flowFiles) { + session.transfer(flowFiles); + } + + @Override + public void transfer(Collection<FlowFile> flowFiles, Relationship relationship) { + session.transfer(flowFiles, relationship); + } + + @Override + public void remove(FlowFile flowFile) { + session.remove(flowFile); + } + + @Override + public void remove(Collection<FlowFile> flowFiles) { + session.remove(flowFiles); + } + + @Override + public void read(FlowFile source, InputStreamCallback reader) { + session.read(source, reader); + } + + @Override + public FlowFile merge(Collection<FlowFile> sources, FlowFile destination) { + return session.merge(sources, destination); + } + + @Override + public FlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator) { + return session.merge(sources, destination, header, footer, demarcator); + } + + @Override + public FlowFile write(FlowFile source, OutputStreamCallback writer) { + return session.write(source, writer); + } + + @Override + public FlowFile write(FlowFile source, StreamCallback writer) { + return session.write(source, writer); + } + + @Override + public FlowFile append(FlowFile source, OutputStreamCallback writer) { + return session.append(source, writer); + } + + @Override + public FlowFile importFrom(Path source, boolean keepSourceFile, FlowFile destination) { + return session.importFrom(source, keepSourceFile, destination); + } + + @Override + public FlowFile importFrom(InputStream source, FlowFile destination) { + return session.importFrom(source, destination); + } + + @Override + public void exportTo(FlowFile flowFile, Path destination, boolean append) { + session.exportTo(flowFile, destination, append); + } + + @Override + public void exportTo(FlowFile flowFile, OutputStream destination) { + session.exportTo(flowFile, destination); + } + + @Override + public ProvenanceReporter getProvenanceReporter() { + return session.getProvenanceReporter(); + } + + } + + }
