Repository: asterixdb
Updated Branches:
  refs/heads/master e0cf97012 -> d1d047b5e


[ASTERIXDB-1443][FEED] Remove Frame Distributor

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- FrameDistributor and DistributeFeedFrameWriter are not used
  anymore.

Change-Id: I27c1ff99ce797923dd709d181387560e4f9448a5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1853
Sonar-Qube: Jenkins <[email protected]>
Reviewed-by: Xikui Wang <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
BAD: Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/d1d047b5
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/d1d047b5
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/d1d047b5

Branch: refs/heads/master
Commit: d1d047b5e4d8a097c6b0c9d49058fc4ffe137cf6
Parents: e0cf970
Author: Abdullah Alamoudi <[email protected]>
Authored: Fri Jun 23 19:09:34 2017 -0700
Committer: abdullah alamoudi <[email protected]>
Committed: Fri Jun 23 21:17:27 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DistributeFeedFrameWriter.java     | 116 ------------
 .../feed/dataflow/FrameDistributor.java         | 186 -------------------
 2 files changed, 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d1d047b5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
deleted file mode 100644
index ae2e0b9..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * Provides mechanism for distributing the frames, as received from an 
operator to a
- * set of registered readers. Each reader typically operates at a different 
pace. Readers
- * are isolated from each other to ensure that a slow reader does not impact 
the progress of
- * others.
- **/
-public class DistributeFeedFrameWriter implements IFrameWriter {
-
-    /** A unique identifier for the feed to which the incoming tuples belong. 
**/
-    private final EntityId feedId;
-
-    /**
-     * An instance of FrameDistributor that provides the mechanism for 
distributing a frame to multiple readers, each
-     * operating in isolation.
-     **/
-    private final FrameDistributor frameDistributor;
-
-    /** The original frame writer instantiated as part of job creation **/
-    private final IFrameWriter writer;
-
-    /** The feed operation whose output is being distributed by the 
DistributeFeedFrameWriter **/
-    private final FeedRuntimeType feedRuntimeType;
-
-    /** The value of the partition 'i' if this is the i'th instance of the 
associated operator **/
-    private final int partition;
-
-    public DistributeFeedFrameWriter(EntityId feedId, IFrameWriter writer, 
FeedRuntimeType feedRuntimeType,
-            int partition) throws IOException {
-        this.feedId = feedId;
-        this.frameDistributor = new FrameDistributor();
-        this.feedRuntimeType = feedRuntimeType;
-        this.partition = partition;
-        this.writer = writer;
-    }
-
-    /**
-     * @param fpa
-     *            Feed policy accessor
-     * @param nextOnlyWriter
-     *            the writer which will deliver the buffers
-     * @param connectionId
-     *            (Dataverse - Dataset - Feed)
-     * @return A frame collector.
-     * @throws HyracksDataException
-     */
-    public void subscribe(FeedFrameCollector collector) throws 
HyracksDataException {
-        frameDistributor.registerFrameCollector(collector);
-    }
-
-    public void unsubscribeFeed(FeedConnectionId connectionId) throws 
HyracksDataException {
-        frameDistributor.deregisterFrameCollector(connectionId);
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        try {
-            frameDistributor.close();
-        } finally {
-            writer.close();
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        writer.fail();
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer frame) throws HyracksDataException {
-        frameDistributor.nextFrame(frame);
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        writer.open();
-    }
-
-    @Override
-    public String toString() {
-        return feedId.toString() + feedRuntimeType + "[" + partition + "]";
-    }
-
-    @Override
-    public void flush() throws HyracksDataException {
-        frameDistributor.flush();
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/d1d047b5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
deleted file mode 100644
index 6ca4b77..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-public class FrameDistributor implements IFrameWriter {
-
-    public static final Logger LOGGER = 
Logger.getLogger(FrameDistributor.class.getName());
-    /** A map storing the registered frame readers ({@code 
FeedFrameCollector}. **/
-    private final Map<FeedConnectionId, FeedFrameCollector> 
registeredCollectors;
-    private Throwable rootFailureCause = null;
-
-    public FrameDistributor() throws HyracksDataException {
-        this.registeredCollectors = new HashMap<FeedConnectionId, 
FeedFrameCollector>();
-    }
-
-    public synchronized void registerFrameCollector(FeedFrameCollector 
frameCollector) throws HyracksDataException {
-        if (rootFailureCause != null) {
-            throw new 
RuntimeDataException(ErrorCode.FEED_DATAFLOW_FRAME_DISTR_REGISTER_FAILED_DATA_PROVIDER,
-                    rootFailureCause);
-        }
-        // registering a new collector.
-        try {
-            frameCollector.open();
-        } catch (Throwable th) {
-            rootFailureCause = th;
-            try {
-                frameCollector.fail();
-            } catch (Throwable failThrowable) {
-                th.addSuppressed(failThrowable);
-            } finally {
-                try {
-                    frameCollector.close();
-                } catch (Throwable closeThrowable) {
-                    th.addSuppressed(closeThrowable);
-                }
-            }
-            throw th;
-        }
-        registeredCollectors.put(frameCollector.getConnectionId(), 
frameCollector);
-    }
-
-    public synchronized void deregisterFrameCollector(FeedFrameCollector 
frameCollector) throws HyracksDataException {
-        deregisterFrameCollector(frameCollector.getConnectionId());
-    }
-
-    public synchronized void deregisterFrameCollector(FeedConnectionId 
connectionId) throws HyracksDataException {
-        if (rootFailureCause != null) {
-            throw new 
RuntimeDataException(ErrorCode.FEED_DATAFLOW_FRAME_DISTR_REGISTER_FAILED_DATA_PROVIDER,
-                    rootFailureCause);
-        }
-        FeedFrameCollector frameCollector = removeFrameCollector(connectionId);
-        try {
-            frameCollector.close();
-        } catch (Throwable th) {
-            rootFailureCause = th;
-            throw th;
-        }
-    }
-
-    public synchronized FeedFrameCollector 
removeFrameCollector(FeedConnectionId connectionId) {
-        return registeredCollectors.remove(connectionId);
-    }
-
-    /*
-     * Fix. What should be done?:
-     * 0. mark failure so no one can subscribe or unsubscribe.
-     * 1. Throw the throwable.
-     * 2. when fail() is called, call fail on all subscribers
-     * 3. close all the subscribers.
-     * (non-Javadoc)
-     * @see 
org.apache.hyracks.api.comm.IFrameWriter#nextFrame(java.nio.ByteBuffer)
-     */
-    @Override
-    public synchronized void nextFrame(ByteBuffer frame) throws 
HyracksDataException {
-        if (rootFailureCause != null) {
-            throw new HyracksDataException(rootFailureCause);
-        }
-        for (FeedFrameCollector collector : registeredCollectors.values()) {
-            try {
-                collector.nextFrame(frame);
-            } catch (Throwable th) {
-                rootFailureCause = th;
-                throw th;
-            }
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        Collection<FeedFrameCollector> collectors = 
registeredCollectors.values();
-        Iterator<FeedFrameCollector> it = collectors.iterator();
-        while (it.hasNext()) {
-            FeedFrameCollector collector = it.next();
-            try {
-                collector.fail();
-            } catch (Throwable th) {
-                while (it.hasNext()) {
-                    FeedFrameCollector innerCollector = it.next();
-                    try {
-                        innerCollector.fail();
-                    } catch (Throwable innerTh) {
-                        th.addSuppressed(innerTh);
-                    }
-                }
-                throw th;
-            }
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        Collection<FeedFrameCollector> collectors = 
registeredCollectors.values();
-        Iterator<FeedFrameCollector> it = collectors.iterator();
-        while (it.hasNext()) {
-            FeedFrameCollector collector = it.next();
-            try {
-                collector.close();
-            } catch (Throwable th) {
-                while (it.hasNext()) {
-                    FeedFrameCollector innerCollector = it.next();
-                    try {
-                        innerCollector.close();
-                    } catch (Throwable innerTh) {
-                        th.addSuppressed(innerTh);
-                    } finally {
-                        innerCollector.setState(State.FINISHED);
-                    }
-                }
-                // resume here
-                throw th;
-            } finally {
-                collector.setState(State.FINISHED);
-            }
-        }
-    }
-
-    @Override
-    public void flush() throws HyracksDataException {
-        if (rootFailureCause != null) {
-            throw new HyracksDataException(rootFailureCause);
-        }
-        for (FeedFrameCollector collector : registeredCollectors.values()) {
-            try {
-                collector.flush();
-            } catch (Throwable th) {
-                rootFailureCause = th;
-                throw th;
-            }
-        }
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        // Nothing to do here :)
-    }
-}

Reply via email to