http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadResponse.java deleted file mode 100644 index 0c863a9..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadResponse.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.dataload; - -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.nio.*; - -/** - * - */ -public class GridDataLoadResponse extends GridTcpCommunicationMessageAdapter { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private long reqId; - - /** */ - private byte[] errBytes; - - /** */ - private boolean forceLocDep; - - /** - * @param reqId Request ID. - * @param errBytes Error bytes. - * @param forceLocDep Force local deployment. - */ - public GridDataLoadResponse(long reqId, byte[] errBytes, boolean forceLocDep) { - this.reqId = reqId; - this.errBytes = errBytes; - this.forceLocDep = forceLocDep; - } - - /** - * {@code Externalizable} support. - */ - public GridDataLoadResponse() { - // No-op. - } - - /** - * @return Request ID. - */ - public long requestId() { - return reqId; - } - - /** - * @return Error bytes. - */ - public byte[] errorBytes() { - return errBytes; - } - - /** - * @return {@code True} to force local deployment. - */ - public boolean forceLocalDeployment() { - return forceLocDep; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDataLoadResponse.class, this); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridDataLoadResponse _clone = new GridDataLoadResponse(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { - GridDataLoadResponse _clone = (GridDataLoadResponse)_msg; - - _clone.reqId = reqId; - _clone.errBytes = errBytes; - _clone.forceLocDep = forceLocDep; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 0: - if (!commState.putByteArray(errBytes)) - return false; - - commState.idx++; - - case 1: - if (!commState.putBoolean(forceLocDep)) - return false; - - commState.idx++; - - case 2: - if (!commState.putLong(reqId)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - switch (commState.idx) { - case 0: - byte[] errBytes0 = commState.getByteArray(); - - if (errBytes0 == BYTE_ARR_NOT_READ) - return false; - - errBytes = errBytes0; - - commState.idx++; - - case 1: - if (buf.remaining() < 1) - return false; - - forceLocDep = commState.getBoolean(); - - commState.idx++; - - case 2: - if (buf.remaining() < 8) - return false; - - reqId = commState.getLong(); - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 62; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java deleted file mode 100644 index 799a194..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoadUpdateJob.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.dataload.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.lang.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Job to put entries to cache on affinity node. - */ -class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> { - /** */ - private final GridKernalContext ctx; - - /** */ - private final IgniteLogger log; - - /** Cache name. */ - private final String cacheName; - - /** Entries to put. */ - private final Collection<Map.Entry<K, V>> col; - - /** {@code True} to ignore deployment ownership. */ - private final boolean ignoreDepOwnership; - - /** */ - private final boolean skipStore; - - /** */ - private final IgniteDataLoadCacheUpdater<K, V> updater; - - /** - * @param ctx Context. - * @param log Log. - * @param cacheName Cache name. - * @param col Entries to put. - * @param ignoreDepOwnership {@code True} to ignore deployment ownership. - * @param updater Updater. - */ - GridDataLoadUpdateJob( - GridKernalContext ctx, - IgniteLogger log, - @Nullable String cacheName, - Collection<Map.Entry<K, V>> col, - boolean ignoreDepOwnership, - boolean skipStore, - IgniteDataLoadCacheUpdater<K, V> updater) { - this.ctx = ctx; - this.log = log; - - assert col != null && !col.isEmpty(); - assert updater != null; - - this.cacheName = cacheName; - this.col = col; - this.ignoreDepOwnership = ignoreDepOwnership; - this.skipStore = skipStore; - this.updater = updater; - } - - /** {@inheritDoc} */ - @Override public Object call() throws Exception { - if (log.isDebugEnabled()) - log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']'); - -// TODO IGNITE-77: restore adapter usage. -// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); -// -// IgniteFuture<?> f = cache.context().preloader().startFuture(); -// -// if (!f.isDone()) -// f.get(); -// -// if (ignoreDepOwnership) -// cache.context().deploy().ignoreOwnership(true); - - IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName); - - if (skipStore) - cache = (IgniteCacheProxy<K, V>)cache.withSkipStore(); - - if (ignoreDepOwnership) - cache.context().deploy().ignoreOwnership(true); - - try { - updater.update(cache, col); - - return null; - } - finally { - if (ignoreDepOwnership) - cache.context().deploy().ignoreOwnership(false); - - if (log.isDebugEnabled()) - log.debug("Update job finished on node: " + ctx.localNodeId()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderFuture.java deleted file mode 100644 index 58cb726..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderFuture.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.tostring.*; - -import java.io.*; - -/** - * Data loader future. - */ -class GridDataLoaderFuture extends GridFutureAdapter<Object> { - /** */ - private static final long serialVersionUID = 0L; - - /** Data loader. */ - @GridToStringExclude - private IgniteDataLoader dataLdr; - - /** - * Default constructor for {@link Externalizable} support. - */ - public GridDataLoaderFuture() { - // No-op. - } - - /** - * @param ctx Context. - * @param dataLdr Data loader. - */ - GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoader dataLdr) { - super(ctx); - - assert dataLdr != null; - - this.dataLdr = dataLdr; - } - - /** {@inheritDoc} */ - @Override public boolean cancel() throws IgniteCheckedException { - checkValid(); - - if (onCancelled()) { - dataLdr.close(true); - - return true; - } - - return false; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDataLoaderFuture.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessor.java deleted file mode 100644 index 5c5e0cb..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderProcessor.java +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.dataload.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.thread.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.internal.GridTopic.*; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; - -/** - * - */ -public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter { - /** Loaders map (access is not supposed to be highly concurrent). */ - private Collection<IgniteDataLoaderImpl> ldrs = new GridConcurrentHashSet<>(); - - /** Busy lock. */ - private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); - - /** Flushing thread. */ - private Thread flusher; - - /** */ - private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ = new DelayQueue<>(); - - /** Marshaller. */ - private final IgniteMarshaller marsh; - - /** - * @param ctx Kernal context. - */ - public GridDataLoaderProcessor(GridKernalContext ctx) { - super(ctx); - - ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - assert msg instanceof GridDataLoadRequest; - - processDataLoadRequest(nodeId, (GridDataLoadRequest)msg); - } - }); - - marsh = ctx.config().getMarshaller(); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) { - @Override protected void body() throws InterruptedException, IgniteInterruptedException { - while (!isCancelled()) { - IgniteDataLoaderImpl<K, V> ldr = flushQ.take(); - - if (!busyLock.enterBusy()) - return; - - try { - if (ldr.isClosed()) - continue; - - ldr.tryFlush(); - - flushQ.offer(ldr); - } - finally { - busyLock.leaveBusy(); - } - } - } - }); - - flusher.start(); - - if (log.isDebugEnabled()) - log.debug("Started data loader processor."); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - if (ctx.config().isDaemon()) - return; - - ctx.io().removeMessageListener(TOPIC_DATALOAD); - - busyLock.block(); - - U.interrupt(flusher); - U.join(flusher, log); - - for (IgniteDataLoader<?, ?> ldr : ldrs) { - if (log.isDebugEnabled()) - log.debug("Closing active data loader on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']'); - - try { - ldr.close(cancel); - } - catch (IgniteInterruptedException e) { - U.warn(log, "Interrupted while waiting for completion of the data loader: " + ldr, e); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to close data loader: " + ldr, e); - } - } - - if (log.isDebugEnabled()) - log.debug("Stopped data loader processor."); - } - - /** - * @param cacheName Cache name ({@code null} for default cache). - * @param compact {@code true} if data loader should transfer data in compact format. - * @return Data loader. - */ - public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName, boolean compact) { - if (!busyLock.enterBusy()) - throw new IllegalStateException("Failed to create data loader (grid is stopping)."); - - try { - final IgniteDataLoaderImpl<K, V> ldr = new IgniteDataLoaderImpl<>(ctx, cacheName, flushQ, compact); - - ldrs.add(ldr); - - ldr.future().listenAsync(new CI1<IgniteFuture<?>>() { - @Override public void apply(IgniteFuture<?> f) { - boolean b = ldrs.remove(ldr); - - assert b : "Loader has not been added to set: " + ldr; - - if (log.isDebugEnabled()) - log.debug("Loader has been completed: " + ldr); - } - }); - - return ldr; - } - finally { - busyLock.leaveBusy(); - } - } - - /** - * @param cacheName Cache name ({@code null} for default cache). - * @return Data loader. - */ - public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) { - return dataLoader(cacheName, true); - } - - /** - * @param nodeId Sender ID. - * @param req Request. - */ - private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) { - if (!busyLock.enterBusy()) { - if (log.isDebugEnabled()) - log.debug("Ignoring data load request (node is stopping): " + req); - - return; - } - - try { - if (log.isDebugEnabled()) - log.debug("Processing data load request: " + req); - - Object topic; - - try { - topic = marsh.unmarshal(req.responseTopicBytes(), null); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal topic from request: " + req, e); - - return; - } - - ClassLoader clsLdr; - - if (req.forceLocalDeployment()) - clsLdr = U.gridClassLoader(); - else { - GridDeployment dep = ctx.deploy().getGlobalDeployment( - req.deploymentMode(), - req.sampleClassName(), - req.sampleClassName(), - req.userVersion(), - nodeId, - req.classLoaderId(), - req.participants(), - null); - - if (dep == null) { - sendResponse(nodeId, - topic, - req.requestId(), - new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId + - ", req=" + req + ']'), - false); - - return; - } - - clsLdr = dep.classLoader(); - } - - Collection<Map.Entry<K, V>> col; - IgniteDataLoadCacheUpdater<K, V> updater; - - try { - col = marsh.unmarshal(req.collectionBytes(), clsLdr); - updater = marsh.unmarshal(req.updaterBytes(), clsLdr); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e); - - sendResponse(nodeId, topic, req.requestId(), e, false); - - return; - } - - GridDataLoadUpdateJob<K, V> job = new GridDataLoadUpdateJob<>(ctx, - log, - req.cacheName(), - col, - req.ignoreDeploymentOwnership(), - req.skipStore(), - updater); - - Exception err = null; - - try { - job.call(); - } - catch (Exception e) { - U.error(log, "Failed to finish update job.", e); - - err = e; - } - - sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment()); - } - finally { - busyLock.leaveBusy(); - } - } - - /** - * @param nodeId Node ID. - * @param resTopic Response topic. - * @param reqId Request ID. - * @param err Error. - * @param forceLocDep Force local deployment. - */ - private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err, - boolean forceLocDep) { - byte[] errBytes; - - try { - errBytes = err != null ? marsh.marshal(err) : null; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal message.", e); - - return; - } - - GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, forceLocDep); - - try { - ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL); - } - catch (IgniteCheckedException e) { - if (ctx.discovery().alive(nodeId)) - U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e); - else if (log.isDebugEnabled()) - log.debug("Node has left the grid: " + nodeId); - } - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats() { - X.println(">>>"); - X.println(">>> Data loader processor memory stats [grid=" + ctx.gridName() + ']'); - X.println(">>> ldrsSize: " + ldrs.size()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/IgniteDataLoaderImpl.java deleted file mode 100644 index 9f849db..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/IgniteDataLoaderImpl.java +++ /dev/null @@ -1,1346 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.dataload.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.product.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.managers.eventstorage.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.Map.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.events.IgniteEventType.*; -import static org.apache.ignite.internal.GridNodeAttributes.*; -import static org.apache.ignite.internal.GridTopic.*; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; - -/** - * Data loader implementation. - */ -public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delayed { - /** */ - public static final IgniteProductVersion COMPACT_MAP_ENTRIES_SINCE = IgniteProductVersion.fromString("1.0.0"); - - /** Cache updater. */ - private IgniteDataLoadCacheUpdater<K, V> updater = GridDataLoadCacheUpdaters.individual(); - - /** */ - private byte[] updaterBytes; - - /** Max remap count before issuing an error. */ - private static final int MAX_REMAP_CNT = 32; - - /** Log reference. */ - private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - - /** Cache name ({@code null} for default cache). */ - private final String cacheName; - - /** Portable enabled flag. */ - private final boolean portableEnabled; - - /** - * If {@code true} then data will be transferred in compact format (only keys and values). - * Otherwise full map entry will be transferred (this is requires by DR internal logic). - */ - private final boolean compact; - - /** Per-node buffer size. */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private int bufSize = DFLT_PER_NODE_BUFFER_SIZE; - - /** */ - private int parallelOps = DFLT_MAX_PARALLEL_OPS; - - /** */ - private long autoFlushFreq; - - /** Mapping. */ - @GridToStringInclude - private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>(); - - /** Logger. */ - private IgniteLogger log; - - /** Discovery listener. */ - private final GridLocalEventListener discoLsnr; - - /** Context. */ - private final GridKernalContext ctx; - - /** Communication topic for responses. */ - private final Object topic; - - /** */ - private byte[] topicBytes; - - /** {@code True} if data loader has been cancelled. */ - private volatile boolean cancelled; - - /** Active futures of this data loader. */ - @GridToStringInclude - private final Collection<IgniteFuture<?>> activeFuts = new GridConcurrentHashSet<>(); - - /** Closure to remove from active futures. */ - @GridToStringExclude - private final IgniteInClosure<IgniteFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteFuture<?>>() { - @Override public void apply(IgniteFuture<?> t) { - boolean rmv = activeFuts.remove(t); - - assert rmv; - } - }; - - /** Job peer deploy aware. */ - private volatile GridPeerDeployAware jobPda; - - /** Deployment class. */ - private Class<?> depCls; - - /** Future to track loading finish. */ - private final GridFutureAdapter<?> fut; - - /** Busy lock. */ - private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); - - /** Closed flag. */ - private final AtomicBoolean closed = new AtomicBoolean(); - - /** */ - private volatile long lastFlushTime = U.currentTimeMillis(); - - /** */ - private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ; - - /** */ - private boolean skipStore; - - /** - * @param ctx Grid kernal context. - * @param cacheName Cache name. - * @param flushQ Flush queue. - * @param compact If {@code true} data is transferred in compact mode (only keys and values). - * Otherwise full map entry will be transferred (this is required by DR internal logic). - */ - public IgniteDataLoaderImpl( - final GridKernalContext ctx, - @Nullable final String cacheName, - DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ, - boolean compact - ) { - assert ctx != null; - - this.ctx = ctx; - this.cacheName = cacheName; - this.flushQ = flushQ; - this.compact = compact; - - log = U.logger(ctx, logRef, IgniteDataLoaderImpl.class); - - ClusterNode node = F.first(ctx.grid().forCache(cacheName).nodes()); - - if (node == null) - throw new IllegalStateException("Cache doesn't exist: " + cacheName); - - Map<String, Boolean> attrPortable = node.attribute(ATTR_CACHE_PORTABLE); - - Boolean portableEnabled0 = attrPortable == null ? null : attrPortable.get(CU.mask(cacheName)); - - portableEnabled = portableEnabled0 == null ? false : portableEnabled0; - - discoLsnr = new GridLocalEventListener() { - @Override public void onEvent(IgniteEvent evt) { - assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; - - IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; - - UUID id = discoEvt.eventNode().id(); - - // Remap regular mappings. - final Buffer buf = bufMappings.remove(id); - - if (buf != null) { - // Only async notification is possible since - // discovery thread may be trapped otherwise. - ctx.closure().callLocalSafe( - new Callable<Object>() { - @Override public Object call() throws Exception { - buf.onNodeLeft(); - - return null; - } - }, - true /* system pool */ - ); - } - } - }; - - ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); - - // Generate unique topic for this loader. - topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId())); - - ctx.io().addMessageListener(topic, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - assert msg instanceof GridDataLoadResponse; - - GridDataLoadResponse res = (GridDataLoadResponse)msg; - - if (log.isDebugEnabled()) - log.debug("Received data load response: " + res); - - Buffer buf = bufMappings.get(nodeId); - - if (buf != null) - buf.onResponse(res); - - else if (log.isDebugEnabled()) - log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", "); - } - }); - - if (log.isDebugEnabled()) - log.debug("Added response listener within topic: " + topic); - - fut = new GridDataLoaderFuture(ctx, this); - } - - /** - * Enters busy lock. - */ - private void enterBusy() { - if (!busyLock.enterBusy()) - throw new IllegalStateException("Data loader has been closed."); - } - - /** - * Leaves busy lock. - */ - private void leaveBusy() { - busyLock.leaveBusy(); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> future() { - return fut; - } - - /** {@inheritDoc} */ - @Override public void deployClass(Class<?> depCls) { - this.depCls = depCls; - } - - /** {@inheritDoc} */ - @Override public void updater(IgniteDataLoadCacheUpdater<K, V> updater) { - A.notNull(updater, "updater"); - - this.updater = updater; - } - - /** {@inheritDoc} */ - @Override public boolean isolated() { - return updater != GridDataLoadCacheUpdaters.individual(); - } - - /** {@inheritDoc} */ - @Override public void isolated(boolean isolated) throws IgniteCheckedException { - if (isolated()) - return; - - ClusterNode node = F.first(ctx.grid().forCache(cacheName).nodes()); - - if (node == null) - throw new IgniteCheckedException("Failed to get node for cache: " + cacheName); - - GridCacheAttributes a = U.cacheAttributes(node, cacheName); - - assert a != null; - - updater = a.atomicityMode() == GridCacheAtomicityMode.ATOMIC ? - GridDataLoadCacheUpdaters.<K, V>batched() : - GridDataLoadCacheUpdaters.<K, V>groupLocked(); - } - - /** {@inheritDoc} */ - @Override public boolean skipStore() { - return skipStore; - } - - /** {@inheritDoc} */ - @Override public void skipStore(boolean skipStore) { - this.skipStore = skipStore; - } - - /** {@inheritDoc} */ - @Override @Nullable public String cacheName() { - return cacheName; - } - - /** {@inheritDoc} */ - @Override public int perNodeBufferSize() { - return bufSize; - } - - /** {@inheritDoc} */ - @Override public void perNodeBufferSize(int bufSize) { - A.ensure(bufSize > 0, "bufSize > 0"); - - this.bufSize = bufSize; - } - - /** {@inheritDoc} */ - @Override public int perNodeParallelLoadOperations() { - return parallelOps; - } - - /** {@inheritDoc} */ - @Override public void perNodeParallelLoadOperations(int parallelOps) { - this.parallelOps = parallelOps; - } - - /** {@inheritDoc} */ - @Override public long autoFlushFrequency() { - return autoFlushFreq; - } - - /** {@inheritDoc} */ - @Override public void autoFlushFrequency(long autoFlushFreq) { - A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0"); - - long old = this.autoFlushFreq; - - if (autoFlushFreq != old) { - this.autoFlushFreq = autoFlushFreq; - - if (autoFlushFreq != 0 && old == 0) - flushQ.add(this); - else if (autoFlushFreq == 0) - flushQ.remove(this); - } - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException { - A.notNull(entries, "entries"); - - return addData(entries.entrySet()); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) { - A.notEmpty(entries, "entries"); - - enterBusy(); - - try { - GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx); - - activeFuts.add(resFut); - - resFut.listenAsync(rmvActiveFut); - - Collection<K> keys = new GridConcurrentHashSet<>(entries.size(), 1.0f, 16); - - for (Map.Entry<K, V> entry : entries) - keys.add(entry.getKey()); - - load0(entries, resFut, keys, 0); - - return resFut; - } - catch (IgniteException e) { - return new GridFinishedFuture<>(ctx, e); - } - finally { - leaveBusy(); - } - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteCheckedException, IllegalStateException { - A.notNull(entry, "entry"); - - return addData(F.asList(entry)); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> addData(K key, V val) throws IgniteCheckedException, IllegalStateException { - A.notNull(key, "key"); - - return addData(new Entry0<>(key, val)); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> removeData(K key) throws IgniteCheckedException, IllegalStateException { - return addData(key, null); - } - - /** - * @param entries Entries. - * @param resFut Result future. - * @param activeKeys Active keys. - * @param remaps Remaps count. - */ - private void load0( - Collection<? extends Map.Entry<K, V>> entries, - final GridFutureAdapter<Object> resFut, - final Collection<K> activeKeys, - final int remaps - ) { - assert entries != null; - - if (remaps >= MAX_REMAP_CNT) { - resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): " + remaps)); - - return; - } - - Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>(); - - boolean initPda = ctx.deploy().enabled() && jobPda == null; - - for (Map.Entry<K, V> entry : entries) { - ClusterNode node; - - try { - K key = entry.getKey(); - - assert key != null; - - if (initPda) { - jobPda = new DataLoaderPda(key, entry.getValue(), updater); - - initPda = false; - } - - node = ctx.affinity().mapKeyToNode(cacheName, key); - } - catch (IgniteCheckedException e) { - resFut.onDone(e); - - return; - } - - if (node == null) { - resFut.onDone(new ClusterTopologyException("Failed to map key to node " + - "(no nodes with cache found in topology) [infos=" + entries.size() + - ", cacheName=" + cacheName + ']')); - - return; - } - - Collection<Map.Entry<K, V>> col = mappings.get(node); - - if (col == null) - mappings.put(node, col = new ArrayList<>()); - - col.add(entry); - } - - for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) { - final UUID nodeId = e.getKey().id(); - - Buffer buf = bufMappings.get(nodeId); - - if (buf == null) { - Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey())); - - if (old != null) - buf = old; - } - - final Collection<Map.Entry<K, V>> entriesForNode = e.getValue(); - - IgniteInClosure<IgniteFuture<?>> lsnr = new IgniteInClosure<IgniteFuture<?>>() { - @Override public void apply(IgniteFuture<?> t) { - try { - t.get(); - - for (Map.Entry<K, V> e : entriesForNode) - activeKeys.remove(e.getKey()); - - if (activeKeys.isEmpty()) - resFut.onDone(); - } - catch (IgniteCheckedException e1) { - if (log.isDebugEnabled()) - log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']'); - - if (cancelled) { - resFut.onDone(new IgniteCheckedException("Data loader has been cancelled: " + - IgniteDataLoaderImpl.this, e1)); - } - else - load0(entriesForNode, resFut, activeKeys, remaps + 1); - } - } - }; - - GridFutureAdapter<?> f; - - try { - f = buf.update(entriesForNode, lsnr); - } - catch (IgniteInterruptedException e1) { - resFut.onDone(e1); - - return; - } - - if (ctx.discovery().node(nodeId) == null) { - if (bufMappings.remove(nodeId, buf)) - buf.onNodeLeft(); - - if (f != null) - f.onDone(new ClusterTopologyException("Failed to wait for request completion " + - "(node has left): " + nodeId)); - } - } - } - - /** - * Performs flush. - * - * @throws IgniteCheckedException If failed. - */ - private void doFlush() throws IgniteCheckedException { - lastFlushTime = U.currentTimeMillis(); - - List<IgniteFuture> activeFuts0 = null; - - int doneCnt = 0; - - for (IgniteFuture<?> f : activeFuts) { - if (!f.isDone()) { - if (activeFuts0 == null) - activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2)); - - activeFuts0.add(f); - } - else { - f.get(); - - doneCnt++; - } - } - - if (activeFuts0 == null || activeFuts0.isEmpty()) - return; - - while (true) { - Queue<IgniteFuture<?>> q = null; - - for (Buffer buf : bufMappings.values()) { - IgniteFuture<?> flushFut = buf.flush(); - - if (flushFut != null) { - if (q == null) - q = new ArrayDeque<>(bufMappings.size() * 2); - - q.add(flushFut); - } - } - - if (q != null) { - assert !q.isEmpty(); - - boolean err = false; - - for (IgniteFuture fut = q.poll(); fut != null; fut = q.poll()) { - try { - fut.get(); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to flush buffer: " + e); - - err = true; - } - } - - if (err) - // Remaps needed - flush buffers. - continue; - } - - doneCnt = 0; - - for (int i = 0; i < activeFuts0.size(); i++) { - IgniteFuture f = activeFuts0.get(i); - - if (f == null) - doneCnt++; - else if (f.isDone()) { - f.get(); - - doneCnt++; - - activeFuts0.set(i, null); - } - else - break; - } - - if (doneCnt == activeFuts0.size()) - return; - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("ForLoopReplaceableByForEach") - @Override public void flush() throws IgniteCheckedException { - enterBusy(); - - try { - doFlush(); - } - finally { - leaveBusy(); - } - } - - /** - * Flushes every internal buffer if buffer was flushed before passed in - * threshold. - * <p> - * Does not wait for result and does not fail on errors assuming that this method - * should be called periodically. - */ - @Override public void tryFlush() throws IgniteInterruptedException { - if (!busyLock.enterBusy()) - return; - - try { - for (Buffer buf : bufMappings.values()) - buf.flush(); - - lastFlushTime = U.currentTimeMillis(); - } - finally { - leaveBusy(); - } - } - - /** - * @param cancel {@code True} to close with cancellation. - * @throws IgniteCheckedException If failed. - */ - @Override public void close(boolean cancel) throws IgniteCheckedException { - if (!closed.compareAndSet(false, true)) - return; - - busyLock.block(); - - if (log.isDebugEnabled()) - log.debug("Closing data loader [ldr=" + this + ", cancel=" + cancel + ']'); - - IgniteCheckedException e = null; - - try { - // Assuming that no methods are called on this loader after this method is called. - if (cancel) { - cancelled = true; - - for (Buffer buf : bufMappings.values()) - buf.cancelAll(); - } - else - doFlush(); - - ctx.event().removeLocalEventListener(discoLsnr); - - ctx.io().removeMessageListener(topic); - } - catch (IgniteCheckedException e0) { - e = e0; - } - - fut.onDone(null, e); - - if (e != null) - throw e; - } - - /** - * @return {@code true} If the loader is closed. - */ - boolean isClosed() { - return fut.isDone(); - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - close(false); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteDataLoaderImpl.class, this); - } - - /** {@inheritDoc} */ - @Override public long getDelay(TimeUnit unit) { - return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS); - } - - /** - * @return Next flush time. - */ - private long nextFlushTime() { - return lastFlushTime + autoFlushFreq; - } - - /** {@inheritDoc} */ - @Override public int compareTo(Delayed o) { - return nextFlushTime() > ((IgniteDataLoaderImpl)o).nextFlushTime() ? 1 : -1; - } - - /** - * - */ - private class Buffer { - /** Node. */ - private final ClusterNode node; - - /** Active futures. */ - private final Collection<IgniteFuture<Object>> locFuts; - - /** Buffered entries. */ - private List<Map.Entry<K, V>> entries; - - /** */ - @GridToStringExclude - private GridFutureAdapter<Object> curFut; - - /** Local node flag. */ - private final boolean isLocNode; - - /** ID generator. */ - private final AtomicLong idGen = new AtomicLong(); - - /** Active futures. */ - private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs; - - /** */ - private final Semaphore sem; - - /** Closure to signal on task finish. */ - @GridToStringExclude - private final IgniteInClosure<IgniteFuture<Object>> signalC = new IgniteInClosure<IgniteFuture<Object>>() { - @Override public void apply(IgniteFuture<Object> t) { - signalTaskFinished(t); - } - }; - - /** - * @param node Node. - */ - Buffer(ClusterNode node) { - assert node != null; - - this.node = node; - - locFuts = new GridConcurrentHashSet<>(); - reqs = new ConcurrentHashMap8<>(); - - // Cache local node flag. - isLocNode = node.equals(ctx.discovery().localNode()); - - entries = newEntries(); - curFut = new GridFutureAdapter<>(ctx); - curFut.listenAsync(signalC); - - sem = new Semaphore(parallelOps); - } - - /** - * @param newEntries Infos. - * @param lsnr Listener for the operation future. - * @throws org.apache.ignite.IgniteInterruptedException If failed. - * @return Future for operation. - */ - @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> newEntries, - IgniteInClosure<IgniteFuture<?>> lsnr) throws IgniteInterruptedException { - List<Map.Entry<K, V>> entries0 = null; - GridFutureAdapter<Object> curFut0; - - synchronized (this) { - curFut0 = curFut; - - curFut0.listenAsync(lsnr); - - for (Map.Entry<K, V> entry : newEntries) - entries.add(entry); - - if (entries.size() >= bufSize) { - entries0 = entries; - - entries = newEntries(); - curFut = new GridFutureAdapter<>(ctx); - curFut.listenAsync(signalC); - } - } - - if (entries0 != null) { - submit(entries0, curFut0); - - if (cancelled) - curFut0.onDone(new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataLoaderImpl.this)); - } - - return curFut0; - } - - /** - * @return Fresh collection with some space for outgrowth. - */ - private List<Map.Entry<K, V>> newEntries() { - return new ArrayList<>((int)(bufSize * 1.2)); - } - - /** - * @return Future if any submitted. - * - * @throws org.apache.ignite.IgniteInterruptedException If thread has been interrupted. - */ - @Nullable - IgniteFuture<?> flush() throws IgniteInterruptedException { - List<Map.Entry<K, V>> entries0 = null; - GridFutureAdapter<Object> curFut0 = null; - - synchronized (this) { - if (!entries.isEmpty()) { - entries0 = entries; - curFut0 = curFut; - - entries = newEntries(); - curFut = new GridFutureAdapter<>(ctx); - curFut.listenAsync(signalC); - } - } - - if (entries0 != null) - submit(entries0, curFut0); - - // Create compound future for this flush. - GridCompoundFuture<Object, Object> res = null; - - for (IgniteFuture<Object> f : locFuts) { - if (res == null) - res = new GridCompoundFuture<>(ctx); - - res.add(f); - } - - for (IgniteFuture<Object> f : reqs.values()) { - if (res == null) - res = new GridCompoundFuture<>(ctx); - - res.add(f); - } - - if (res != null) - res.markInitialized(); - - return res; - } - - /** - * Increments active tasks count. - * - * @throws org.apache.ignite.IgniteInterruptedException If thread has been interrupted. - */ - private void incrementActiveTasks() throws IgniteInterruptedException { - U.acquire(sem); - } - - /** - * @param f Future that finished. - */ - private void signalTaskFinished(IgniteFuture<Object> f) { - assert f != null; - - sem.release(); - } - - /** - * @param entries Entries to submit. - * @param curFut Current future. - * @throws org.apache.ignite.IgniteInterruptedException If interrupted. - */ - private void submit(final Collection<Map.Entry<K, V>> entries, final GridFutureAdapter<Object> curFut) - throws IgniteInterruptedException { - assert entries != null; - assert !entries.isEmpty(); - assert curFut != null; - - incrementActiveTasks(); - - IgniteFuture<Object> fut; - - if (isLocNode) { - fut = ctx.closure().callLocalSafe( - new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false); - - locFuts.add(fut); - - fut.listenAsync(new IgniteInClosure<IgniteFuture<Object>>() { - @Override public void apply(IgniteFuture<Object> t) { - try { - boolean rmv = locFuts.remove(t); - - assert rmv; - - curFut.onDone(t.get()); - } - catch (IgniteCheckedException e) { - curFut.onDone(e); - } - } - }); - } - else { - byte[] entriesBytes; - - try { - if (compact) { - if (node.version().compareTo(COMPACT_MAP_ENTRIES_SINCE) < 0) { - Collection<Map.Entry<K, V>> entries0 = new ArrayList<>(entries.size()); - - GridPortableProcessor portable = ctx.portable(); - - for (Map.Entry<K, V> entry : entries) - entries0.add(new Entry0<>( - portableEnabled ? (K)portable.marshalToPortable(entry.getKey()) : entry.getKey(), - portableEnabled ? (V)portable.marshalToPortable(entry.getValue()) : entry.getValue())); - - entriesBytes = ctx.config().getMarshaller().marshal(entries0); - } - else - entriesBytes = ctx.config().getMarshaller() - .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null)); - } - else - entriesBytes = ctx.config().getMarshaller().marshal(entries); - - if (updaterBytes == null) { - assert updater != null; - - updaterBytes = ctx.config().getMarshaller().marshal(updater); - } - - if (topicBytes == null) - topicBytes = ctx.config().getMarshaller().marshal(topic); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal (request will not be sent).", e); - - return; - } - - GridDeployment dep = null; - GridPeerDeployAware jobPda0 = null; - - if (ctx.deploy().enabled()) { - try { - jobPda0 = jobPda; - - assert jobPda0 != null; - - dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader()); - - GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); - - if (cache != null) - cache.context().deploy().onEnter(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e); - - return; - } - - if (dep == null) - U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass()); - } - - long reqId = idGen.incrementAndGet(); - - fut = curFut; - - reqs.put(reqId, (GridFutureAdapter<Object>)fut); - - GridDataLoadRequest req = new GridDataLoadRequest( - reqId, - topicBytes, - cacheName, - updaterBytes, - entriesBytes, - true, - skipStore, - dep != null ? dep.deployMode() : null, - dep != null ? jobPda0.deployClass().getName() : null, - dep != null ? dep.userVersion() : null, - dep != null ? dep.participants() : null, - dep != null ? dep.classLoaderId() : null, - dep == null); - - try { - ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL); - - if (log.isDebugEnabled()) - log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']'); - } - catch (IgniteCheckedException e) { - if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id())) - ((GridFutureAdapter<Object>)fut).onDone(e); - else - ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyException("Failed to send " + - "request (node has left): " + node.id())); - } - } - } - - /** - * - */ - void onNodeLeft() { - assert !isLocNode; - assert bufMappings.get(node.id()) != this; - - if (log.isDebugEnabled()) - log.debug("Forcibly completing futures (node has left): " + node.id()); - - Exception e = new ClusterTopologyException("Failed to wait for request completion " + - "(node has left): " + node.id()); - - for (GridFutureAdapter<Object> f : reqs.values()) - f.onDone(e); - - // Make sure to complete current future. - GridFutureAdapter<Object> curFut0; - - synchronized (this) { - curFut0 = curFut; - } - - curFut0.onDone(e); - } - - /** - * @param res Response. - */ - void onResponse(GridDataLoadResponse res) { - if (log.isDebugEnabled()) - log.debug("Received data load response: " + res); - - GridFutureAdapter<?> f = reqs.remove(res.requestId()); - - if (f == null) { - if (log.isDebugEnabled()) - log.debug("Future for request has not been found: " + res.requestId()); - - return; - } - - Throwable err = null; - - byte[] errBytes = res.errorBytes(); - - if (errBytes != null) { - try { - GridPeerDeployAware jobPda0 = jobPda; - - err = ctx.config().getMarshaller().unmarshal( - errBytes, - jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader()); - } - catch (IgniteCheckedException e) { - f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e)); - - return; - } - } - - f.onDone(null, err); - - if (log.isDebugEnabled()) - log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']'); - } - - /** - * - */ - void cancelAll() { - IgniteCheckedException err = new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataLoaderImpl.this); - - for (IgniteFuture<?> f : locFuts) { - try { - f.cancel(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to cancel mini-future.", e); - } - } - - for (GridFutureAdapter<?> f : reqs.values()) - f.onDone(err); - } - - /** {@inheritDoc} */ - @Override public String toString() { - int size; - - synchronized (this) { - size = entries.size(); - } - - return S.toString(Buffer.class, this, - "entriesCnt", size, - "locFutsSize", locFuts.size(), - "reqsSize", reqs.size()); - } - } - - /** - * Data loader peer-deploy aware. - */ - private class DataLoaderPda implements GridPeerDeployAware { - /** */ - private static final long serialVersionUID = 0L; - - /** Deploy class. */ - private Class<?> cls; - - /** Class loader. */ - private ClassLoader ldr; - - /** Collection of objects to detect deploy class and class loader. */ - private Collection<Object> objs; - - /** - * Constructs data loader peer-deploy aware. - * - * @param objs Collection of objects to detect deploy class and class loader. - */ - private DataLoaderPda(Object... objs) { - this.objs = Arrays.asList(objs); - } - - /** {@inheritDoc} */ - @Override public Class<?> deployClass() { - if (cls == null) { - Class<?> cls0 = null; - - if (depCls != null) - cls0 = depCls; - else { - for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) { - Object o = it.next(); - - if (o != null) - cls0 = U.detectClass(o); - } - - if (cls0 == null || U.isJdk(cls0)) - cls0 = IgniteDataLoaderImpl.class; - } - - assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']'; - - cls = cls0; - } - - return cls; - } - - /** {@inheritDoc} */ - @Override public ClassLoader classLoader() { - if (ldr == null) { - ClassLoader ldr0 = deployClass().getClassLoader(); - - // Safety. - if (ldr0 == null) - ldr0 = U.gridClassLoader(); - - assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']'; - - ldr = ldr0; - } - - return ldr; - } - } - - /** - * Entry. - */ - private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private K key; - - /** */ - private V val; - - /** - * @param key Key. - * @param val Value. - */ - private Entry0(K key, @Nullable V val) { - assert key != null; - - this.key = key; - this.val = val; - } - - /** - * For {@link Externalizable}. - */ - @SuppressWarnings("UnusedDeclaration") - public Entry0() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public K getKey() { - return key; - } - - /** {@inheritDoc} */ - @Override public V getValue() { - return val; - } - - /** {@inheritDoc} */ - @Override public V setValue(V val) { - V old = this.val; - - this.val = val; - - return old; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(key); - out.writeObject(val); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - key = (K)in.readObject(); - val = (V)in.readObject(); - } - } - - /** - * Wrapper list with special compact serialization of map entries. - */ - private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K, V>> implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Wrapped delegate. */ - private Collection<Map.Entry<K, V>> delegate; - - /** Optional portable processor for converting values. */ - private GridPortableProcessor portable; - - /** - * @param delegate Delegate. - * @param portable Portable processor. - */ - private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor portable) { - this.delegate = delegate; - this.portable = portable; - } - - /** - * For {@link Externalizable}. - */ - public Entries0() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public Iterator<Entry<K, V>> iterator() { - return delegate.iterator(); - } - - /** {@inheritDoc} */ - @Override public int size() { - return delegate.size(); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(delegate.size()); - - boolean portableEnabled = portable != null; - - for (Map.Entry<K, V> entry : delegate) { - if (portableEnabled) { - out.writeObject(portable.marshalToPortable(entry.getKey())); - out.writeObject(portable.marshalToPortable(entry.getValue())); - } - else { - out.writeObject(entry.getKey()); - out.writeObject(entry.getValue()); - } - } - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - int sz = in.readInt(); - - delegate = new ArrayList<>(sz); - - for (int i = 0; i < sz; i++) { - Object k = in.readObject(); - Object v = in.readObject(); - - delegate.add(new Entry0<>((K)k, (V)v)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/package.html b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/package.html deleted file mode 100644 index 50a90ff..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/dataload/package.html +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Data loader processor. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java new file mode 100644 index 0000000..71ea41f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Tests for {@link GridAffinityProcessor}. + */ +@GridCommonTest(group = "Affinity Processor") +public abstract class GridAffinityProcessorAbstractSelfTest extends GridCommonAbstractTest { + /** Number of grids started for tests. Should not be less than 2. */ + private static final int NODES_CNT = 3; + + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Flag to start grid with cache. */ + private boolean withCache; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + if (withCache) { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName(CACHE_NAME); + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setBackups(1); + cacheCfg.setAffinity(affinityFunction()); + + cfg.setCacheConfiguration(cacheCfg); + } + + return cfg; + } + + /** + * Creates affinity function for test. + * + * @return Affinity function. + */ + protected abstract GridCacheAffinityFunction affinityFunction(); + + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions"}) + @Override protected void beforeTestsStarted() throws Exception { + assert NODES_CNT >= 1; + + withCache = false; + + for (int i = 0; i < NODES_CNT; i++) + startGrid(i); + + withCache = true; + + for (int i = NODES_CNT; i < 2 * NODES_CNT; i++) + startGrid(i); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * Test affinity functions caching and clean up. + * + * @throws Exception In case of any exception. + */ + @SuppressWarnings("AssertEqualsBetweenInconvertibleTypes") + public void testAffinityProcessor() throws Exception { + Random rnd = new Random(); + + final GridKernal grid1 = (GridKernal)grid(rnd.nextInt(NODES_CNT)); // With cache. + GridKernal grid2 = (GridKernal)grid(NODES_CNT + rnd.nextInt(NODES_CNT)); // Without cache. + + assertEquals(NODES_CNT * 2, grid1.nodes().size()); + assertEquals(NODES_CNT * 2, grid2.nodes().size()); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid1.cache(CACHE_NAME); + + return null; + } + }, IllegalArgumentException.class, null); + + GridCache<Integer, Integer> cache = grid2.cache(CACHE_NAME); + + assertNotNull(cache); + + GridAffinityProcessor affPrc1 = grid1.context().affinity(); + GridAffinityProcessor affPrc2 = grid2.context().affinity(); + + // Create keys collection. + Collection<Integer> keys = new ArrayList<>(1000); + + for (int i = 0; i < 1000; i++) + keys.add(i); + + // + // Validate affinity functions collection updated on first call. + // + + Map<ClusterNode, Collection<Integer>> node1Map = affPrc1.mapKeysToNodes(CACHE_NAME, keys); + Map<ClusterNode, Collection<Integer>> node2Map = affPrc2.mapKeysToNodes(CACHE_NAME, keys); + Map<ClusterNode, Collection<Integer>> cacheMap = cache.affinity().mapKeysToNodes(keys); + + assertEquals(cacheMap.size(), node1Map.size()); + assertEquals(cacheMap.size(), node2Map.size()); + + for (Map.Entry<ClusterNode, Collection<Integer>> entry : cacheMap.entrySet()) { + ClusterNode node = entry.getKey(); + + Collection<Integer> mappedKeys = entry.getValue(); + + Collection<Integer> mapped1 = node1Map.get(node); + Collection<Integer> mapped2 = node2Map.get(node); + + assertTrue(mappedKeys.containsAll(mapped1) && mapped1.containsAll(mappedKeys)); + assertTrue(mappedKeys.containsAll(mapped2) && mapped2.containsAll(mappedKeys)); + } + } + + /** + * Test performance of affinity processor. + * + * @throws Exception In case of any exception. + */ + public void testPerformance() throws Exception { + GridKernal grid = (GridKernal)grid(0); + GridAffinityProcessor aff = grid.context().affinity(); + + int keysSize = 1000000; + + Collection<Integer> keys = new ArrayList<>(keysSize); + + for (int i = 0; i < keysSize; i++) + keys.add(i); + + long start = System.currentTimeMillis(); + + int iterations = 10000000; + + for (int i = 0; i < iterations; i++) + aff.mapKeyToNode(keys); + + long diff = System.currentTimeMillis() - start; + + info(">>> Map " + keysSize + " keys to " + grid.nodes().size() + " nodes " + iterations + " times in " + diff + "ms."); + + assertTrue(diff < 25000); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java new file mode 100644 index 0000000..0f8a49e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.consistenthash.*; + +/** + * Tests consistent hash affinity function. + */ +public class GridAffinityProcessorConsistentHashSelfTest extends GridAffinityProcessorAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheAffinityFunction affinityFunction() { + return new GridCacheConsistentHashAffinityFunction(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java new file mode 100644 index 0000000..fc1831e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.affinity; + +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.rendezvous.*; + +/** + * Tests affinity processor with rendezvous affinity function. + */ +public class GridAffinityProcessorRendezvousSelfTest extends GridAffinityProcessorAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheAffinityFunction affinityFunction() { + return new GridCacheRendezvousAffinityFunction(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index 4542297..4fac368 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -21,7 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.internal.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.continuous.*; +import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.gridgain.testframework.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorRemoteTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorRemoteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorRemoteTest.java new file mode 100644 index 0000000..ec27b86 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorRemoteTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.closure; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.junits.common.*; +import java.util.*; + +/** + * Tests execution of anonymous closures on remote nodes. + */ +@GridCommonTest(group = "Closure Processor") +public class GridClosureProcessorRemoteTest extends GridCommonAbstractTest { + /** + * + */ + public GridClosureProcessorRemoteTest() { + super(true); // Start grid. + } + + /** {@inheritDoc} */ + @Override public String getTestGridName() { + return null; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setDiscoverySpi(new TcpDiscoverySpi()); + + return cfg; + } + + /** + * @throws Exception Thrown in case of failure. + */ + public void testAnonymousBroadcast() throws Exception { + Ignite g = grid(); + + assert g.cluster().nodes().size() >= 2; + + g.compute().run(new CA() { + @Override public void apply() { + System.out.println("BROADCASTING...."); + } + }); + + Thread.sleep(2000); + } + + /** + * @throws Exception Thrown in case of failure. + */ + public void testAnonymousUnicast() throws Exception { + Ignite g = grid(); + + assert g.cluster().nodes().size() >= 2; + + ClusterNode rmt = F.first(g.cluster().forRemotes().nodes()); + + compute(g.cluster().forNode(rmt)).run(new CA() { + @Override public void apply() { + System.out.println("UNICASTING...."); + } + }); + + Thread.sleep(2000); + } + + /** + * + * @throws Exception Thrown in case of failure. + */ + public void testAnonymousUnicastRequest() throws Exception { + Ignite g = grid(); + + assert g.cluster().nodes().size() >= 2; + + ClusterNode rmt = F.first(g.cluster().forRemotes().nodes()); + final ClusterNode loc = g.cluster().localNode(); + + compute(g.cluster().forNode(rmt)).run(new CA() { + @Override public void apply() { + message(grid().forNode(loc)).localListen(new IgniteBiPredicate<UUID, String>() { + @Override public boolean apply(UUID uuid, String s) { + System.out.println("Received test message [nodeId: " + uuid + ", s=" + s + ']'); + + return false; + } + }, null); + } + }); + + message(g.cluster().forNode(rmt)).send(null, "TESTING..."); + + Thread.sleep(2000); + } +}
