http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java new file mode 100644 index 0000000..7ff573a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.ipc.loopback.*; +import org.apache.ignite.internal.util.ipc.shmem.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.thread.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.spi.IgnitePortProtocol.*; + +/** + * GGFS server. Handles requests passed from GGFS clients. + */ +public class IgfsServer { + /** GGFS context. */ + private final IgfsContext ggfsCtx; + + /** Logger. */ + private final IgniteLogger log; + + /** GGFS marshaller. */ + private final IgfsMarshaller marsh; + + /** Endpoint configuration. */ + private final Map<String,String> endpointCfg; + + /** Server endpoint. */ + private IpcServerEndpoint srvEndpoint; + + /** Server message handler. */ + private IgfsServerHandler hnd; + + /** Accept worker. */ + private AcceptWorker acceptWorker; + + /** Started client workers. */ + private ConcurrentLinkedDeque8<ClientWorker> clientWorkers = new ConcurrentLinkedDeque8<>(); + + /** Flag indicating if this a management endpoint. */ + private final boolean mgmt; + + /** + * Constructs ggfs server manager. + * @param ggfsCtx GGFS context. + * @param endpointCfg Endpoint configuration to start. + * @param mgmt Management flag - if true, server is intended to be started for Visor. + */ + public IgfsServer(IgfsContext ggfsCtx, Map<String, String> endpointCfg, boolean mgmt) { + assert ggfsCtx != null; + assert endpointCfg != null; + + this.endpointCfg = endpointCfg; + this.ggfsCtx = ggfsCtx; + this.mgmt = mgmt; + + log = ggfsCtx.kernalContext().log(IgfsServer.class); + + marsh = new IgfsMarshaller(); + } + + /** + * Starts this server. + * + * @throws IgniteCheckedException If failed. + */ + public void start() throws IgniteCheckedException { + srvEndpoint = IpcServerEndpointDeserializer.deserialize(endpointCfg); + + if (U.isWindows() && srvEndpoint instanceof IpcSharedMemoryServerEndpoint) + throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.class.getSimpleName() + + " should not be configured on Windows (configure " + + IpcServerTcpEndpoint.class.getSimpleName() + ")"); + + if (srvEndpoint instanceof IpcServerTcpEndpoint) { + IpcServerTcpEndpoint srvEndpoint0 = (IpcServerTcpEndpoint)srvEndpoint; + + srvEndpoint0.setManagement(mgmt); + + if (srvEndpoint0.getHost() == null) { + if (mgmt) { + String locHostName = ggfsCtx.kernalContext().config().getLocalHost(); + + try { + srvEndpoint0.setHost(U.resolveLocalHost(locHostName).getHostAddress()); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to resolve local host: " + locHostName, e); + } + } + else + // Bind non-management endpoint to 127.0.0.1 by default. + srvEndpoint0.setHost("127.0.0.1"); + } + } + + ggfsCtx.kernalContext().resource().injectGeneric(srvEndpoint); + + srvEndpoint.start(); + + // IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered. + if (srvEndpoint.getPort() >= 0) + ggfsCtx.kernalContext().ports().registerPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass()); + + hnd = new IgfsIpcHandler(ggfsCtx); + + // Start client accept worker. + acceptWorker = new AcceptWorker(); + } + + /** + * Callback that is invoked when kernal is ready. + */ + public void onKernalStart() { + // Accept connections only when grid is ready. + if (srvEndpoint != null) + new IgniteThread(acceptWorker).start(); + } + + /** + * Stops this server. + * + * @param cancel Cancel flag. + */ + public void stop(boolean cancel) { + // Skip if did not start. + if (srvEndpoint == null) + return; + + // Stop accepting new client connections. + U.cancel(acceptWorker); + + U.join(acceptWorker, log); + + // Stop server handler, no more requests on existing connections will be processed. + try { + hnd.stop(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to stop GGFS server handler (will close client connections anyway).", e); + } + + // Stop existing client connections. + for (ClientWorker worker : clientWorkers) + U.cancel(worker); + + U.join(clientWorkers, log); + + // IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered. + if (srvEndpoint.getPort() >= 0) + ggfsCtx.kernalContext().ports().deregisterPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass()); + + try { + ggfsCtx.kernalContext().resource().cleanupGeneric(srvEndpoint); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to cleanup server endpoint.", e); + } + } + + /** + * Gets IPC server endpoint. + * + * @return IPC server endpoint. + */ + public IpcServerEndpoint getIpcServerEndpoint() { + return srvEndpoint; + } + + /** + * Client reader thread. + */ + private class ClientWorker extends GridWorker { + /** Connected client endpoint. */ + private IpcEndpoint endpoint; + + /** Data output stream. */ + private final IgfsDataOutputStream out; + + /** Client session object. */ + private IgfsClientSession ses; + + /** Queue node for fast unlink. */ + private ConcurrentLinkedDeque8.Node<ClientWorker> node; + + /** + * Creates client worker. + * + * @param idx Worker index for worker thread naming. + * @param endpoint Connected client endpoint. + * @throws IgniteCheckedException If endpoint output stream cannot be obtained. + */ + protected ClientWorker(IpcEndpoint endpoint, int idx) throws IgniteCheckedException { + super(ggfsCtx.kernalContext().gridName(), "ggfs-client-worker-" + idx, log); + + this.endpoint = endpoint; + + ses = new IgfsClientSession(); + + out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream())); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + try { + IgfsDataInputStream dis = new IgfsDataInputStream(endpoint.inputStream()); + + byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE]; + + boolean first = true; + + while (!Thread.currentThread().isInterrupted()) { + dis.readFully(hdr); + + final long reqId = U.bytesToLong(hdr, 0); + + int ordinal = U.bytesToInt(hdr, 8); + + if (first) { // First message must be HANDSHAKE. + if (reqId != 0 || ordinal != IgfsIpcCommand.HANDSHAKE.ordinal()) { + U.warn(log, "Handshake failed."); + + return; + } + + first = false; + } + + final IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(ordinal); + + IgfsMessage msg = marsh.unmarshall(cmd, hdr, dis); + + IgniteInternalFuture<IgfsMessage> fut = hnd.handleAsync(ses, msg, dis); + + // If fut is null, no response is required. + if (fut != null) { + if (fut.isDone()) { + IgfsMessage res; + + try { + res = fut.get(); + } + catch (IgniteCheckedException e) { + res = new IgfsControlResponse(); + + ((IgfsControlResponse)res).error(e); + } + + try { + synchronized (out) { + // Reuse header. + IgfsMarshaller.fillHeader(hdr, reqId, res.command()); + + marsh.marshall(res, hdr, out); + + out.flush(); + } + } + catch (IOException | IgniteCheckedException e) { + shutdown0(e); + } + } + else { + fut.listenAsync(new CIX1<IgniteInternalFuture<IgfsMessage>>() { + @Override public void applyx(IgniteInternalFuture<IgfsMessage> fut) { + IgfsMessage res; + + try { + res = fut.get(); + } + catch (IgniteCheckedException e) { + res = new IgfsControlResponse(); + + ((IgfsControlResponse)res).error(e); + } + + try { + synchronized (out) { + byte[] hdr = IgfsMarshaller.createHeader(reqId, res.command()); + + marsh.marshall(res, hdr, out); + + out.flush(); + } + } + catch (IOException | IgniteCheckedException e) { + shutdown0(e); + } + } + }); + } + } + } + } + catch (EOFException ignored) { + // Client closed connection. + } + catch (IgniteCheckedException | IOException e) { + if (!isCancelled()) + U.error(log, "Failed to read data from client (will close connection)", e); + } + finally { + onFinished(); + } + } + + /** + * @param node Node in queue for this worker. + */ + public void node(ConcurrentLinkedDeque8.Node<ClientWorker> node) { + this.node = node; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + super.cancel(); + + shutdown0(null); + } + + /** + * @param e Optional exception occurred while stopping this + */ + private void shutdown0(@Nullable Throwable e) { + if (!isCancelled()) { + if (e != null) + U.error(log, "Stopping client reader due to exception: " + endpoint, e); + } + + U.closeQuiet(out); + + endpoint.close(); + } + + /** + * Final resource cleanup. + */ + private void onFinished() { + // Second close is no-op, if closed manually. + U.closeQuiet(out); + + endpoint.close(); + + // Finally, remove from queue. + if (clientWorkers.unlinkx(node)) + hnd.onClosed(ses); + } + } + + /** + * Accept worker. + */ + private class AcceptWorker extends GridWorker { + /** Accept index. */ + private int acceptCnt; + + /** + * Creates accept worker. + */ + protected AcceptWorker() { + super(ggfsCtx.kernalContext().gridName(), "ggfs-accept-worker", log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + try { + while (!Thread.currentThread().isInterrupted()) { + IpcEndpoint client = srvEndpoint.accept(); + + if (log.isDebugEnabled()) + log.debug("GGFS client connected [ggfsName=" + ggfsCtx.kernalContext().gridName() + + ", client=" + client + ']'); + + ClientWorker worker = new ClientWorker(client, acceptCnt++); + + IgniteThread workerThread = new IgniteThread(worker); + + ConcurrentLinkedDeque8.Node<ClientWorker> node = clientWorkers.addx(worker); + + worker.node(node); + + workerThread.start(); + } + } + catch (IgniteCheckedException e) { + if (!isCancelled()) + U.error(log, "Failed to accept client IPC connection (will shutdown accept thread).", e); + } + finally { + srvEndpoint.close(); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + super.cancel(); + + srvEndpoint.close(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerHandler.java new file mode 100644 index 0000000..f3870ab --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.igfs.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * GGFS server message handler. Server component that is plugged in to the server implementation + * to handle incoming messages asynchronously. + */ +public interface IgfsServerHandler { + /** + * Asynchronously handles incoming message. + * + * @param ses Client session. + * @param msg Message to process. + * @param in Data input. Stream to read from in case if this is a WRITE_BLOCK message. + * @return Future that will be completed when response is ready or {@code null} if no + * response is required. + */ + @Nullable public IgniteInternalFuture<IgfsMessage> handleAsync(IgfsClientSession ses, + IgfsMessage msg, DataInput in); + + /** + * Handles handles client close events. + * + * @param ses Session that was closed. + */ + public void onClosed(IgfsClientSession ses); + + /** + * Stops handling of incoming requests. No server commands will be handled anymore. + * + * @throws IgniteCheckedException If error occurred. + */ + public void stop() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java new file mode 100644 index 0000000..cf99401 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServerManager.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.thread.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.configuration.IgfsConfiguration.*; + +/** + * GGFS server manager. + */ +public class IgfsServerManager extends IgfsManager { + /** IPC server rebind interval. */ + private static final long REBIND_INTERVAL = 3000; + + /** Collection of servers to maintain. */ + private Collection<IgfsServer> srvrs; + + /** Server port binders. */ + private BindWorker bindWorker; + + /** Kernal start latch. */ + private CountDownLatch kernalStartLatch = new CountDownLatch(1); + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + IgfsConfiguration ggfsCfg = igfsCtx.configuration(); + Map<String,String> cfg = ggfsCfg.getIpcEndpointConfiguration(); + + if (F.isEmpty(cfg)) { + // Set default configuration. + cfg = new HashMap<>(); + + cfg.put("type", U.isWindows() ? "tcp" : "shmem"); + cfg.put("port", String.valueOf(DFLT_IPC_PORT)); + } + + if (ggfsCfg.isIpcEndpointEnabled()) + bind(cfg, /*management*/false); + + if (ggfsCfg.getManagementPort() >= 0) { + cfg = new HashMap<>(); + + cfg.put("type", "tcp"); + cfg.put("port", String.valueOf(ggfsCfg.getManagementPort())); + + bind(cfg, /*management*/true); + } + + if (bindWorker != null) + new IgniteThread(bindWorker).start(); + } + + /** + * Tries to start server endpoint with specified configuration. If failed, will print warning and start a thread + * that will try to periodically start this endpoint. + * + * @param endpointCfg Endpoint configuration to start. + * @param mgmt {@code True} if endpoint is management. + * @throws IgniteCheckedException If failed. + */ + private void bind(final Map<String,String> endpointCfg, final boolean mgmt) throws IgniteCheckedException { + if (srvrs == null) + srvrs = new ConcurrentLinkedQueue<>(); + + IgfsServer ipcSrv = new IgfsServer(igfsCtx, endpointCfg, mgmt); + + try { + ipcSrv.start(); + + srvrs.add(ipcSrv); + } + catch (IpcEndpointBindException ignored) { + int port = ipcSrv.getIpcServerEndpoint().getPort(); + + String portMsg = port != -1 ? " Failed to bind to port (is port already in use?): " + port : ""; + + U.warn(log, "Failed to start GGFS " + (mgmt ? "management " : "") + "endpoint " + + "(will retry every " + (REBIND_INTERVAL / 1000) + "s)." + + portMsg); + + if (bindWorker == null) + bindWorker = new BindWorker(); + + bindWorker.addConfiguration(endpointCfg, mgmt); + } + } + + /** + * @return Collection of active endpoints. + */ + public Collection<IpcServerEndpoint> endpoints() { + return F.viewReadOnly(srvrs, new C1<IgfsServer, IpcServerEndpoint>() { + @Override public IpcServerEndpoint apply(IgfsServer e) { + return e.getIpcServerEndpoint(); + } + }); + } + + /** {@inheritDoc} */ + @Override protected void onKernalStart0() throws IgniteCheckedException { + if (!F.isEmpty(srvrs)) { + for (IgfsServer srv : srvrs) + srv.onKernalStart(); + } + + kernalStartLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override protected void stop0(boolean cancel) { + // Safety. + kernalStartLatch.countDown(); + + if (bindWorker != null) { + bindWorker.cancel(); + + U.join(bindWorker, log); + } + + if (!F.isEmpty(srvrs)) { + for (IgfsServer srv : srvrs) + srv.stop(cancel); + } + } + + /** + * Bind worker. + */ + @SuppressWarnings("BusyWait") + private class BindWorker extends GridWorker { + /** Configurations to bind. */ + private Collection<IgniteBiTuple<Map<String, String>, Boolean>> bindCfgs = new LinkedList<>(); + + /** + * Constructor. + */ + private BindWorker() { + super(igfsCtx.kernalContext().gridName(), "bind-worker", igfsCtx.kernalContext().log()); + } + + /** + * Adds configuration to bind on. Should not be called after thread start. + * + * @param cfg Configuration. + * @param mgmt Management flag. + */ + public void addConfiguration(Map<String, String> cfg, boolean mgmt) { + bindCfgs.add(F.t(cfg, mgmt)); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException { + kernalStartLatch.await(); + + while (!isCancelled()) { + Thread.sleep(REBIND_INTERVAL); + + Iterator<IgniteBiTuple<Map<String, String>, Boolean>> it = bindCfgs.iterator(); + + while (it.hasNext()) { + IgniteBiTuple<Map<String, String>, Boolean> cfg = it.next(); + + IgfsServer ipcSrv = new IgfsServer(igfsCtx, cfg.get1(), cfg.get2()); + + try { + ipcSrv.start(); + + ipcSrv.onKernalStart(); + + srvrs.add(ipcSrv); + + it.remove(); + } + catch (IgniteCheckedException e) { + if (GridWorker.log.isDebugEnabled()) + GridWorker.log.debug("Failed to bind GGFS endpoint [cfg=" + cfg + ", err=" + e.getMessage() + ']'); + } + } + + if (bindCfgs.isEmpty()) + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsStatus.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsStatus.java new file mode 100644 index 0000000..65ce0ce --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsStatus.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import java.io.*; + +/** + * GGFS response for status request. + */ +public class IgfsStatus implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Total space size. */ + private long spaceTotal; + + /** Used space in GGFS. */ + private long spaceUsed; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public IgfsStatus() { + // No-op. + } + + /** + * @param spaceUsed Used space in GGFS. + * @param spaceTotal Total space available in GGFS. + */ + public IgfsStatus(long spaceUsed, long spaceTotal) { + this.spaceUsed = spaceUsed; + this.spaceTotal = spaceTotal; + } + + /** + * @return Total space available in GGFS. + */ + public long spaceTotal() { + return spaceTotal; + } + + /** + * @return Used space in GGFS. + */ + public long spaceUsed() { + return spaceUsed; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(spaceUsed); + out.writeLong(spaceTotal); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + spaceUsed = in.readLong(); + spaceTotal = in.readLong(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java new file mode 100644 index 0000000..9f45205 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSyncMessage.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.io.*; +import java.nio.*; + +/** + * Basic sync message. + */ +public class IgfsSyncMessage extends IgfsCommunicationMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Coordinator node order. */ + private long order; + + /** Response flag. */ + private boolean res; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public IgfsSyncMessage() { + // No-op. + } + + /** + * @param order Node order. + * @param res Response flag. + */ + public IgfsSyncMessage(long order, boolean res) { + this.order = order; + this.res = res; + } + + /** + * @return Coordinator node order. + */ + public long order() { + return order; + } + + /** + * @return {@code True} if response message. + */ + public boolean response() { + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsSyncMessage.class, this); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public MessageAdapter clone() { + IgfsSyncMessage _clone = new IgfsSyncMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(MessageAdapter _msg) { + super.clone0(_msg); + + IgfsSyncMessage _clone = (IgfsSyncMessage)_msg; + + _clone.order = order; + _clone.res = res; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + writer.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!typeWritten) { + if (!writer.writeByte(null, directType())) + return false; + + typeWritten = true; + } + + switch (state) { + case 0: + if (!writer.writeLong("order", order)) + return false; + + state++; + + case 1: + if (!writer.writeBoolean("res", res)) + return false; + + state++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + reader.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (state) { + case 0: + order = reader.readLong("order"); + + if (!reader.isLastRead()) + return false; + + state++; + + case 1: + res = reader.readBoolean("res"); + + if (!reader.isLastRead()) + return false; + + state++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 71; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsTaskArgsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsTaskArgsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsTaskArgsImpl.java new file mode 100644 index 0000000..222fed5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsTaskArgsImpl.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.igfs.*; +import org.apache.ignite.igfs.mapreduce.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * GGFS task arguments implementation. + */ +public class IgfsTaskArgsImpl<T> implements IgfsTaskArgs<T>, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** GGFS name. */ + private String ggfsName; + + /** Paths. */ + private Collection<IgfsPath> paths; + + /** Record resolver. */ + private IgfsRecordResolver recRslvr; + + /** Skip non existent files flag. */ + private boolean skipNonExistentFiles; + + /** Maximum range length. */ + private long maxRangeLen; + + /** User argument. */ + private T usrArg; + + /** + * {@link Externalizable} support. + */ + public IgfsTaskArgsImpl() { + // No-op. + } + + /** + * Constructor. + * + * @param ggfsName GGFS name. + * @param paths Paths. + * @param recRslvr Record resolver. + * @param skipNonExistentFiles Skip non existent files flag. + * @param maxRangeLen Maximum range length. + * @param usrArg User argument. + */ + public IgfsTaskArgsImpl(String ggfsName, Collection<IgfsPath> paths, IgfsRecordResolver recRslvr, + boolean skipNonExistentFiles, long maxRangeLen, T usrArg) { + this.ggfsName = ggfsName; + this.paths = paths; + this.recRslvr = recRslvr; + this.skipNonExistentFiles = skipNonExistentFiles; + this.maxRangeLen = maxRangeLen; + this.usrArg = usrArg; + } + + /** {@inheritDoc} */ + @Override public String ggfsName() { + return ggfsName; + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> paths() { + return paths; + } + + /** {@inheritDoc} */ + @Override public IgfsRecordResolver recordResolver() { + return recRslvr; + } + + /** {@inheritDoc} */ + @Override public boolean skipNonExistentFiles() { + return skipNonExistentFiles; + } + + /** {@inheritDoc} */ + @Override public long maxRangeLength() { + return maxRangeLen; + } + + /** {@inheritDoc} */ + @Override public T userArgument() { + return usrArg; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsTaskArgsImpl.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, ggfsName); + U.writeCollection(out, paths); + + out.writeObject(recRslvr); + out.writeBoolean(skipNonExistentFiles); + out.writeLong(maxRangeLen); + out.writeObject(usrArg); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ggfsName = U.readString(in); + paths = U.readCollection(in); + + recRslvr = (IgfsRecordResolver)in.readObject(); + skipNonExistentFiles = in.readBoolean(); + maxRangeLen = in.readLong(); + usrArg = (T)in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThread.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThread.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThread.java new file mode 100644 index 0000000..533277f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsThread.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.internal.util.typedef.*; + +/** + * GGFS ad-hoc thread. + */ +public abstract class IgfsThread extends Thread { + /** + * Creates {@code GGFS} add-hoc thread. + */ + protected IgfsThread() { + super("ggfs-worker"); + } + + /** + * Creates {@code GGFS} add-hoc thread. + * + * @param name Thread name. + */ + protected IgfsThread(String name) { + super(name); + } + + /** {@inheritDoc} */ + @Override public final void run() { + try { + body(); + } + catch (InterruptedException ignore) { + interrupt(); + } + // Catch all. + catch (Throwable e) { + X.error("Failed to execute GGFS ad-hoc thread: " + e.getMessage()); + + e.printStackTrace(); + } + finally { + try { + cleanup(); + } + // Catch all. + catch (Throwable e) { + X.error("Failed to clean up GGFS ad-hoc thread: " + e.getMessage()); + + e.printStackTrace(); + } + } + } + + /** + * Thread body. + * + * @throws InterruptedException If interrupted. + */ + protected abstract void body() throws InterruptedException; + + /** + * Cleanup. + */ + protected void cleanup() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/package.html new file mode 100644 index 0000000..60e49f9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains high performance file system processer. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorIgfsSamplingStateTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorIgfsSamplingStateTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorIgfsSamplingStateTask.java index d855ead..4229988 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorIgfsSamplingStateTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/ggfs/VisorIgfsSamplingStateTask.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.visor.ggfs; import org.apache.ignite.*; -import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.visor.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java index ca8c257..f1b067f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java @@ -22,7 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.ipc.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.visor.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java index a0e4871..057fa52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java @@ -24,7 +24,7 @@ import org.apache.ignite.cache.eviction.lru.*; import org.apache.ignite.cache.eviction.random.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; -import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.visor.event.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java index da54743..360e5eb 100644 --- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java @@ -21,7 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; import org.apache.ignite.spi.discovery.tcp.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java index 97373a9..fbaa9ab 100644 --- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java @@ -19,7 +19,7 @@ package org.apache.ignite.igfs; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.spi.discovery.tcp.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java index 9cfdd75..a8b2901 100644 --- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java @@ -20,7 +20,7 @@ package org.apache.ignite.igfs; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridCacheIgfsPerBlockLruEvictionPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridCacheIgfsPerBlockLruEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridCacheIgfsPerBlockLruEvictionPolicySelfTest.java deleted file mode 100644 index 90a8172..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/fs/GridCacheIgfsPerBlockLruEvictionPolicySelfTest.java +++ /dev/null @@ -1,485 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.eviction.ignitefs.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.igfs.IgfsMode.*; - -/** - * Tests for GGFS per-block LR eviction policy. - */ -@SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) -public class GridCacheIgfsPerBlockLruEvictionPolicySelfTest extends IgfsCommonAbstractTest { - /** Primary GGFS name. */ - private static final String GGFS_PRIMARY = "ggfs-primary"; - - /** Primary GGFS name. */ - private static final String GGFS_SECONDARY = "ggfs-secondary"; - - /** Secondary file system REST endpoint configuration map. */ - private static final Map<String, String> SECONDARY_REST_CFG = new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", "11500"); - }}; - - /** File working in PRIMARY mode. */ - public static final IgfsPath FILE = new IgfsPath("/file"); - - /** File working in DUAL mode. */ - public static final IgfsPath FILE_RMT = new IgfsPath("/fileRemote"); - - /** Primary GGFS instances. */ - private static IgfsImpl ggfsPrimary; - - /** Secondary GGFS instance. */ - private static IgniteFs secondaryFs; - - /** Primary file system data cache. */ - private static GridCacheAdapter<IgfsBlockKey, byte[]> dataCache; - - /** Eviction policy */ - private static CacheIgfsPerBlockLruEvictionPolicy evictPlc; - - /** - * Start a grid with the primary file system. - * - * @throws Exception If failed. - */ - private void startPrimary() throws Exception { - IgfsConfiguration ggfsCfg = new IgfsConfiguration(); - - ggfsCfg.setDataCacheName("dataCache"); - ggfsCfg.setMetaCacheName("metaCache"); - ggfsCfg.setName(GGFS_PRIMARY); - ggfsCfg.setBlockSize(512); - ggfsCfg.setDefaultMode(PRIMARY); - ggfsCfg.setPrefetchBlocks(1); - ggfsCfg.setSequentialReadsBeforePrefetch(Integer.MAX_VALUE); - ggfsCfg.setSecondaryFileSystem(secondaryFs); - - Map<String, IgfsMode> pathModes = new HashMap<>(); - - pathModes.put(FILE_RMT.toString(), DUAL_SYNC); - - ggfsCfg.setPathModes(pathModes); - - CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); - - dataCacheCfg.setName("dataCache"); - dataCacheCfg.setCacheMode(PARTITIONED); - dataCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); - dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - dataCacheCfg.setAtomicityMode(TRANSACTIONAL); - - evictPlc = new CacheIgfsPerBlockLruEvictionPolicy(); - - dataCacheCfg.setEvictionPolicy(evictPlc); - dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); - dataCacheCfg.setBackups(0); - dataCacheCfg.setQueryIndexEnabled(false); - - CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); - - metaCacheCfg.setName("metaCache"); - metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); - metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setQueryIndexEnabled(false); - metaCacheCfg.setAtomicityMode(TRANSACTIONAL); - - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setGridName("grid-primary"); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - cfg.setDiscoverySpi(discoSpi); - cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); - cfg.setGgfsConfiguration(ggfsCfg); - - cfg.setLocalHost("127.0.0.1"); - cfg.setConnectorConfiguration(null); - - Ignite g = G.start(cfg); - - ggfsPrimary = (IgfsImpl)g.fileSystem(GGFS_PRIMARY); - - dataCache = ggfsPrimary.context().kernalContext().cache().internalCache( - ggfsPrimary.context().configuration().getDataCacheName()); - } - - /** - * Start a grid with the secondary file system. - * - * @throws Exception If failed. - */ - private void startSecondary() throws Exception { - IgfsConfiguration ggfsCfg = new IgfsConfiguration(); - - ggfsCfg.setDataCacheName("dataCache"); - ggfsCfg.setMetaCacheName("metaCache"); - ggfsCfg.setName(GGFS_SECONDARY); - ggfsCfg.setBlockSize(512); - ggfsCfg.setDefaultMode(PRIMARY); - ggfsCfg.setIpcEndpointConfiguration(SECONDARY_REST_CFG); - - CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); - - dataCacheCfg.setName("dataCache"); - dataCacheCfg.setCacheMode(PARTITIONED); - dataCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); - dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); - dataCacheCfg.setBackups(0); - dataCacheCfg.setQueryIndexEnabled(false); - dataCacheCfg.setAtomicityMode(TRANSACTIONAL); - - CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); - - metaCacheCfg.setName("metaCache"); - metaCacheCfg.setCacheMode(REPLICATED); - metaCacheCfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY); - metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - metaCacheCfg.setQueryIndexEnabled(false); - metaCacheCfg.setAtomicityMode(TRANSACTIONAL); - - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setGridName("grid-secondary"); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - cfg.setDiscoverySpi(discoSpi); - cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); - cfg.setGgfsConfiguration(ggfsCfg); - - cfg.setLocalHost("127.0.0.1"); - cfg.setConnectorConfiguration(null); - - Ignite g = G.start(cfg); - - secondaryFs = g.fileSystem(GGFS_SECONDARY); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - try { - // Cleanup. - ggfsPrimary.format(); - - while (!dataCache.isEmpty()) - U.sleep(100); - - checkEvictionPolicy(0, 0); - } - finally { - stopAllGrids(false); - } - } - - /** - * Startup primary and secondary file systems. - * - * @throws Exception If failed. - */ - private void start() throws Exception { - startSecondary(); - startPrimary(); - - evictPlc.setMaxBlocks(0); - evictPlc.setMaxSize(0); - evictPlc.setExcludePaths(null); - } - - /** - * Test how evictions are handled for a file working in PRIMARY mode. - * - * @throws Exception If failed. - */ - public void testFilePrimary() throws Exception { - start(); - - // Create file in primary mode. It must not be propagated to eviction policy. - ggfsPrimary.create(FILE, true).close(); - - checkEvictionPolicy(0, 0); - - int blockSize = ggfsPrimary.info(FILE).blockSize(); - - append(FILE, blockSize); - - checkEvictionPolicy(0, 0); - - read(FILE, 0, blockSize); - - checkEvictionPolicy(0, 0); - } - - /** - * Test how evictions are handled for a file working in PRIMARY mode. - * - * @throws Exception If failed. - */ - public void testFileDual() throws Exception { - start(); - - ggfsPrimary.create(FILE_RMT, true).close(); - - checkEvictionPolicy(0, 0); - - int blockSize = ggfsPrimary.info(FILE_RMT).blockSize(); - - // File write. - append(FILE_RMT, blockSize); - - checkEvictionPolicy(1, blockSize); - - // One more write. - append(FILE_RMT, blockSize); - - checkEvictionPolicy(2, blockSize * 2); - - // Read. - read(FILE_RMT, 0, blockSize); - - checkEvictionPolicy(2, blockSize * 2); - } - - /** - * Ensure that a DUAL mode file is not propagated to eviction policy - * - * @throws Exception If failed. - */ - public void testFileDualExclusion() throws Exception { - start(); - - evictPlc.setExcludePaths(Collections.singleton(FILE_RMT.toString())); - - // Create file in primary mode. It must not be propagated to eviction policy. - ggfsPrimary.create(FILE_RMT, true).close(); - - checkEvictionPolicy(0, 0); - - int blockSize = ggfsPrimary.info(FILE_RMT).blockSize(); - - append(FILE_RMT, blockSize); - - checkEvictionPolicy(0, 0); - - read(FILE_RMT, 0, blockSize); - - checkEvictionPolicy(0, 0); - } - - /** - * Ensure that exception is thrown in case we are trying to rename file with one exclude setting to the file with - * another. - * - * @throws Exception If failed. - */ - public void testRenameDifferentExcludeSettings() throws Exception { - start(); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - ggfsPrimary.rename(FILE, FILE_RMT); - - return null; - } - }, IgfsInvalidPathException.class, "Cannot move file to a path with different eviction exclude setting " + - "(need to copy and remove)"); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - ggfsPrimary.rename(FILE_RMT, FILE); - - return null; - } - }, IgfsInvalidPathException.class, "Cannot move file to a path with different eviction exclude setting " + - "(need to copy and remove)"); - } - - /** - * Test eviction caused by too much blocks. - * - * @throws Exception If failed. - */ - public void testBlockCountEviction() throws Exception { - start(); - - int blockCnt = 3; - - evictPlc.setMaxBlocks(blockCnt); - - ggfsPrimary.create(FILE_RMT, true).close(); - - checkEvictionPolicy(0, 0); - - int blockSize = ggfsPrimary.info(FILE_RMT).blockSize(); - - // Write blocks up to the limit. - append(FILE_RMT, blockSize * blockCnt); - - checkEvictionPolicy(blockCnt, blockCnt * blockSize); - - // Write one more block what should cause eviction. - append(FILE_RMT, blockSize); - - checkEvictionPolicy(blockCnt, blockCnt * blockSize); - - // Read the first block. - read(FILE_RMT, 0, blockSize); - - checkEvictionPolicy(blockCnt, blockCnt * blockSize); - checkMetrics(1, 1); - } - - /** - * Test eviction caused by too big data size. - * - * @throws Exception If failed. - */ - public void testDataSizeEviction() throws Exception { - start(); - - ggfsPrimary.create(FILE_RMT, true).close(); - - int blockCnt = 3; - int blockSize = ggfsPrimary.info(FILE_RMT).blockSize(); - - evictPlc.setMaxSize(blockSize * blockCnt); - - // Write blocks up to the limit. - append(FILE_RMT, blockSize * blockCnt); - - checkEvictionPolicy(blockCnt, blockCnt * blockSize); - - // Reset metrics. - ggfsPrimary.resetMetrics(); - - // Read the first block what should cause reordering. - read(FILE_RMT, 0, blockSize); - - checkMetrics(1, 0); - checkEvictionPolicy(blockCnt, blockCnt * blockSize); - - // Write one more block what should cause eviction of the block 2. - append(FILE_RMT, blockSize); - - checkEvictionPolicy(blockCnt, blockCnt * blockSize); - - // Read the first block. - read(FILE_RMT, 0, blockSize); - - checkMetrics(2, 0); - checkEvictionPolicy(blockCnt, blockCnt * blockSize); - - // Read the second block (which was evicted). - read(FILE_RMT, blockSize, blockSize); - - checkMetrics(3, 1); - checkEvictionPolicy(blockCnt, blockCnt * blockSize); - } - - /** - * Read some data from the given file with the given offset. - * - * @param path File path. - * @param off Offset. - * @param len Length. - * @throws Exception If failed. - */ - private void read(IgfsPath path, int off, int len) throws Exception { - IgfsInputStream is = ggfsPrimary.open(path); - - is.readFully(off, new byte[len]); - - is.close(); - } - - /** - * Append some data to the given file. - * - * @param path File path. - * @param len Data length. - * @throws Exception If failed. - */ - private void append(IgfsPath path, int len) throws Exception { - IgfsOutputStream os = ggfsPrimary.append(path, false); - - os.write(new byte[len]); - - os.close(); - } - - /** - * Check metrics counters. - * - * @param blocksRead Expected blocks read. - * @param blocksReadRmt Expected blocks read remote. - * @throws Exception If failed. - */ - public void checkMetrics(final long blocksRead, final long blocksReadRmt) throws Exception { - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - IgfsMetrics metrics = ggfsPrimary.metrics(); - - return metrics.blocksReadTotal() == blocksRead && metrics.blocksReadRemote() == blocksReadRmt; - } - }, 5000) : "Unexpected metrics [expectedBlocksReadTotal=" + blocksRead + ", actualBlocksReadTotal=" + - ggfsPrimary.metrics().blocksReadTotal() + ", expectedBlocksReadRemote=" + blocksReadRmt + - ", actualBlocksReadRemote=" + ggfsPrimary.metrics().blocksReadRemote() + ']'; - } - - /** - * Check eviction policy state. - * - * @param curBlocks Current blocks. - * @param curBytes Current bytes. - */ - private void checkEvictionPolicy(final int curBlocks, final long curBytes) throws IgniteInterruptedCheckedException { - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return evictPlc.getCurrentBlocks() == curBlocks && evictPlc.getCurrentSize() == curBytes; - } - }, 5000) : "Unexpected counts [expectedBlocks=" + curBlocks + ", actualBlocks=" + evictPlc.getCurrentBlocks() + - ", expectedBytes=" + curBytes + ", currentBytes=" + curBytes + ']'; - } -}
