http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServer.java deleted file mode 100644 index a1170b6..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServer.java +++ /dev/null @@ -1,427 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.ipc.loopback.*; -import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.apache.ignite.thread.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.spi.IgnitePortProtocol.*; - -/** - * GGFS server. Handles requests passed from GGFS clients. - */ -public class IgfsServer { - /** GGFS context. */ - private final IgfsContext ggfsCtx; - - /** Logger. */ - private final IgniteLogger log; - - /** GGFS marshaller. */ - private final IgfsMarshaller marsh; - - /** Endpoint configuration. */ - private final Map<String,String> endpointCfg; - - /** Server endpoint. */ - private IpcServerEndpoint srvEndpoint; - - /** Server message handler. */ - private IgfsServerHandler hnd; - - /** Accept worker. */ - private AcceptWorker acceptWorker; - - /** Started client workers. */ - private ConcurrentLinkedDeque8<ClientWorker> clientWorkers = new ConcurrentLinkedDeque8<>(); - - /** Flag indicating if this a management endpoint. */ - private final boolean mgmt; - - /** - * Constructs ggfs server manager. - * @param ggfsCtx GGFS context. - * @param endpointCfg Endpoint configuration to start. - * @param mgmt Management flag - if true, server is intended to be started for Visor. - */ - public IgfsServer(IgfsContext ggfsCtx, Map<String, String> endpointCfg, boolean mgmt) { - assert ggfsCtx != null; - assert endpointCfg != null; - - this.endpointCfg = endpointCfg; - this.ggfsCtx = ggfsCtx; - this.mgmt = mgmt; - - log = ggfsCtx.kernalContext().log(IgfsServer.class); - - marsh = new IgfsMarshaller(); - } - - /** - * Starts this server. - * - * @throws IgniteCheckedException If failed. - */ - public void start() throws IgniteCheckedException { - srvEndpoint = IpcServerEndpointDeserializer.deserialize(endpointCfg); - - if (U.isWindows() && srvEndpoint instanceof IpcSharedMemoryServerEndpoint) - throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.class.getSimpleName() + - " should not be configured on Windows (configure " + - IpcServerTcpEndpoint.class.getSimpleName() + ")"); - - if (srvEndpoint instanceof IpcServerTcpEndpoint) { - IpcServerTcpEndpoint srvEndpoint0 = (IpcServerTcpEndpoint)srvEndpoint; - - srvEndpoint0.setManagement(mgmt); - - if (srvEndpoint0.getHost() == null) { - if (mgmt) { - String locHostName = ggfsCtx.kernalContext().config().getLocalHost(); - - try { - srvEndpoint0.setHost(U.resolveLocalHost(locHostName).getHostAddress()); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to resolve local host: " + locHostName, e); - } - } - else - // Bind non-management endpoint to 127.0.0.1 by default. - srvEndpoint0.setHost("127.0.0.1"); - } - } - - ggfsCtx.kernalContext().resource().injectGeneric(srvEndpoint); - - srvEndpoint.start(); - - // IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered. - if (srvEndpoint.getPort() >= 0) - ggfsCtx.kernalContext().ports().registerPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass()); - - hnd = new IgfsIpcHandler(ggfsCtx); - - // Start client accept worker. - acceptWorker = new AcceptWorker(); - } - - /** - * Callback that is invoked when kernal is ready. - */ - public void onKernalStart() { - // Accept connections only when grid is ready. - if (srvEndpoint != null) - new IgniteThread(acceptWorker).start(); - } - - /** - * Stops this server. - * - * @param cancel Cancel flag. - */ - public void stop(boolean cancel) { - // Skip if did not start. - if (srvEndpoint == null) - return; - - // Stop accepting new client connections. - U.cancel(acceptWorker); - - U.join(acceptWorker, log); - - // Stop server handler, no more requests on existing connections will be processed. - try { - hnd.stop(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to stop GGFS server handler (will close client connections anyway).", e); - } - - // Stop existing client connections. - for (ClientWorker worker : clientWorkers) - U.cancel(worker); - - U.join(clientWorkers, log); - - // IpcServerEndpoint.getPort contract states return -1 if there is no port to be registered. - if (srvEndpoint.getPort() >= 0) - ggfsCtx.kernalContext().ports().deregisterPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass()); - - try { - ggfsCtx.kernalContext().resource().cleanupGeneric(srvEndpoint); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to cleanup server endpoint.", e); - } - } - - /** - * Gets IPC server endpoint. - * - * @return IPC server endpoint. - */ - public IpcServerEndpoint getIpcServerEndpoint() { - return srvEndpoint; - } - - /** - * Client reader thread. - */ - private class ClientWorker extends GridWorker { - /** Connected client endpoint. */ - private IpcEndpoint endpoint; - - /** Data output stream. */ - private final IgfsDataOutputStream out; - - /** Client session object. */ - private IgfsClientSession ses; - - /** Queue node for fast unlink. */ - private ConcurrentLinkedDeque8.Node<ClientWorker> node; - - /** - * Creates client worker. - * - * @param idx Worker index for worker thread naming. - * @param endpoint Connected client endpoint. - * @throws IgniteCheckedException If endpoint output stream cannot be obtained. - */ - protected ClientWorker(IpcEndpoint endpoint, int idx) throws IgniteCheckedException { - super(ggfsCtx.kernalContext().gridName(), "ggfs-client-worker-" + idx, log); - - this.endpoint = endpoint; - - ses = new IgfsClientSession(); - - out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream())); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - try { - IgfsDataInputStream dis = new IgfsDataInputStream(endpoint.inputStream()); - - byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE]; - - boolean first = true; - - while (!Thread.currentThread().isInterrupted()) { - dis.readFully(hdr); - - final long reqId = U.bytesToLong(hdr, 0); - - int ordinal = U.bytesToInt(hdr, 8); - - if (first) { // First message must be HANDSHAKE. - if (reqId != 0 || ordinal != IgfsIpcCommand.HANDSHAKE.ordinal()) { - U.warn(log, "Handshake failed."); - - return; - } - - first = false; - } - - final IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(ordinal); - - IgfsMessage msg = marsh.unmarshall(cmd, hdr, dis); - - IgniteInternalFuture<IgfsMessage> fut = hnd.handleAsync(ses, msg, dis); - - // If fut is null, no response is required. - if (fut != null) { - if (fut.isDone()) { - IgfsMessage res; - - try { - res = fut.get(); - } - catch (IgniteCheckedException e) { - res = new IgfsControlResponse(); - - ((IgfsControlResponse)res).error(e); - } - - try { - synchronized (out) { - // Reuse header. - IgfsMarshaller.fillHeader(hdr, reqId, res.command()); - - marsh.marshall(res, hdr, out); - - out.flush(); - } - } - catch (IOException | IgniteCheckedException e) { - shutdown0(e); - } - } - else { - fut.listenAsync(new CIX1<IgniteInternalFuture<IgfsMessage>>() { - @Override public void applyx(IgniteInternalFuture<IgfsMessage> fut) { - IgfsMessage res; - - try { - res = fut.get(); - } - catch (IgniteCheckedException e) { - res = new IgfsControlResponse(); - - ((IgfsControlResponse)res).error(e); - } - - try { - synchronized (out) { - byte[] hdr = IgfsMarshaller.createHeader(reqId, res.command()); - - marsh.marshall(res, hdr, out); - - out.flush(); - } - } - catch (IOException | IgniteCheckedException e) { - shutdown0(e); - } - } - }); - } - } - } - } - catch (EOFException ignored) { - // Client closed connection. - } - catch (IgniteCheckedException | IOException e) { - if (!isCancelled()) - U.error(log, "Failed to read data from client (will close connection)", e); - } - finally { - onFinished(); - } - } - - /** - * @param node Node in queue for this worker. - */ - public void node(ConcurrentLinkedDeque8.Node<ClientWorker> node) { - this.node = node; - } - - /** {@inheritDoc} */ - @Override public void cancel() { - super.cancel(); - - shutdown0(null); - } - - /** - * @param e Optional exception occurred while stopping this - */ - private void shutdown0(@Nullable Throwable e) { - if (!isCancelled()) { - if (e != null) - U.error(log, "Stopping client reader due to exception: " + endpoint, e); - } - - U.closeQuiet(out); - - endpoint.close(); - } - - /** - * Final resource cleanup. - */ - private void onFinished() { - // Second close is no-op, if closed manually. - U.closeQuiet(out); - - endpoint.close(); - - // Finally, remove from queue. - if (clientWorkers.unlinkx(node)) - hnd.onClosed(ses); - } - } - - /** - * Accept worker. - */ - private class AcceptWorker extends GridWorker { - /** Accept index. */ - private int acceptCnt; - - /** - * Creates accept worker. - */ - protected AcceptWorker() { - super(ggfsCtx.kernalContext().gridName(), "ggfs-accept-worker", log); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - try { - while (!Thread.currentThread().isInterrupted()) { - IpcEndpoint client = srvEndpoint.accept(); - - if (log.isDebugEnabled()) - log.debug("GGFS client connected [ggfsName=" + ggfsCtx.kernalContext().gridName() + - ", client=" + client + ']'); - - ClientWorker worker = new ClientWorker(client, acceptCnt++); - - IgniteThread workerThread = new IgniteThread(worker); - - ConcurrentLinkedDeque8.Node<ClientWorker> node = clientWorkers.addx(worker); - - worker.node(node); - - workerThread.start(); - } - } - catch (IgniteCheckedException e) { - if (!isCancelled()) - U.error(log, "Failed to accept client IPC connection (will shutdown accept thread).", e); - } - finally { - srvEndpoint.close(); - } - } - - /** {@inheritDoc} */ - @Override public void cancel() { - super.cancel(); - - srvEndpoint.close(); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerHandler.java deleted file mode 100644 index d7c7e11..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerHandler.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.igfs.common.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * GGFS server message handler. Server component that is plugged in to the server implementation - * to handle incoming messages asynchronously. - */ -public interface IgfsServerHandler { - /** - * Asynchronously handles incoming message. - * - * @param ses Client session. - * @param msg Message to process. - * @param in Data input. Stream to read from in case if this is a WRITE_BLOCK message. - * @return Future that will be completed when response is ready or {@code null} if no - * response is required. - */ - @Nullable public IgniteInternalFuture<IgfsMessage> handleAsync(IgfsClientSession ses, - IgfsMessage msg, DataInput in); - - /** - * Handles handles client close events. - * - * @param ses Session that was closed. - */ - public void onClosed(IgfsClientSession ses); - - /** - * Stops handling of incoming requests. No server commands will be handled anymore. - * - * @throws IgniteCheckedException If error occurred. - */ - public void stop() throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerManager.java deleted file mode 100644 index 07322e8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsServerManager.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.ipc.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.thread.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.configuration.IgfsConfiguration.*; - -/** - * GGFS server manager. - */ -public class IgfsServerManager extends IgfsManager { - /** IPC server rebind interval. */ - private static final long REBIND_INTERVAL = 3000; - - /** Collection of servers to maintain. */ - private Collection<IgfsServer> srvrs; - - /** Server port binders. */ - private BindWorker bindWorker; - - /** Kernal start latch. */ - private CountDownLatch kernalStartLatch = new CountDownLatch(1); - - /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - IgfsConfiguration ggfsCfg = igfsCtx.configuration(); - Map<String,String> cfg = ggfsCfg.getIpcEndpointConfiguration(); - - if (F.isEmpty(cfg)) { - // Set default configuration. - cfg = new HashMap<>(); - - cfg.put("type", U.isWindows() ? "tcp" : "shmem"); - cfg.put("port", String.valueOf(DFLT_IPC_PORT)); - } - - if (ggfsCfg.isIpcEndpointEnabled()) - bind(cfg, /*management*/false); - - if (ggfsCfg.getManagementPort() >= 0) { - cfg = new HashMap<>(); - - cfg.put("type", "tcp"); - cfg.put("port", String.valueOf(ggfsCfg.getManagementPort())); - - bind(cfg, /*management*/true); - } - - if (bindWorker != null) - new IgniteThread(bindWorker).start(); - } - - /** - * Tries to start server endpoint with specified configuration. If failed, will print warning and start a thread - * that will try to periodically start this endpoint. - * - * @param endpointCfg Endpoint configuration to start. - * @param mgmt {@code True} if endpoint is management. - * @throws IgniteCheckedException If failed. - */ - private void bind(final Map<String,String> endpointCfg, final boolean mgmt) throws IgniteCheckedException { - if (srvrs == null) - srvrs = new ConcurrentLinkedQueue<>(); - - IgfsServer ipcSrv = new IgfsServer(igfsCtx, endpointCfg, mgmt); - - try { - ipcSrv.start(); - - srvrs.add(ipcSrv); - } - catch (IpcEndpointBindException ignored) { - int port = ipcSrv.getIpcServerEndpoint().getPort(); - - String portMsg = port != -1 ? " Failed to bind to port (is port already in use?): " + port : ""; - - U.warn(log, "Failed to start GGFS " + (mgmt ? "management " : "") + "endpoint " + - "(will retry every " + (REBIND_INTERVAL / 1000) + "s)." + - portMsg); - - if (bindWorker == null) - bindWorker = new BindWorker(); - - bindWorker.addConfiguration(endpointCfg, mgmt); - } - } - - /** - * @return Collection of active endpoints. - */ - public Collection<IpcServerEndpoint> endpoints() { - return F.viewReadOnly(srvrs, new C1<IgfsServer, IpcServerEndpoint>() { - @Override public IpcServerEndpoint apply(IgfsServer e) { - return e.getIpcServerEndpoint(); - } - }); - } - - /** {@inheritDoc} */ - @Override protected void onKernalStart0() throws IgniteCheckedException { - if (!F.isEmpty(srvrs)) { - for (IgfsServer srv : srvrs) - srv.onKernalStart(); - } - - kernalStartLatch.countDown(); - } - - /** {@inheritDoc} */ - @Override protected void stop0(boolean cancel) { - // Safety. - kernalStartLatch.countDown(); - - if (bindWorker != null) { - bindWorker.cancel(); - - U.join(bindWorker, log); - } - - if (!F.isEmpty(srvrs)) { - for (IgfsServer srv : srvrs) - srv.stop(cancel); - } - } - - /** - * Bind worker. - */ - @SuppressWarnings("BusyWait") - private class BindWorker extends GridWorker { - /** Configurations to bind. */ - private Collection<IgniteBiTuple<Map<String, String>, Boolean>> bindCfgs = new LinkedList<>(); - - /** - * Constructor. - */ - private BindWorker() { - super(igfsCtx.kernalContext().gridName(), "bind-worker", igfsCtx.kernalContext().log()); - } - - /** - * Adds configuration to bind on. Should not be called after thread start. - * - * @param cfg Configuration. - * @param mgmt Management flag. - */ - public void addConfiguration(Map<String, String> cfg, boolean mgmt) { - bindCfgs.add(F.t(cfg, mgmt)); - } - - /** {@inheritDoc} */ - @Override protected void body() throws InterruptedException { - kernalStartLatch.await(); - - while (!isCancelled()) { - Thread.sleep(REBIND_INTERVAL); - - Iterator<IgniteBiTuple<Map<String, String>, Boolean>> it = bindCfgs.iterator(); - - while (it.hasNext()) { - IgniteBiTuple<Map<String, String>, Boolean> cfg = it.next(); - - IgfsServer ipcSrv = new IgfsServer(igfsCtx, cfg.get1(), cfg.get2()); - - try { - ipcSrv.start(); - - ipcSrv.onKernalStart(); - - srvrs.add(ipcSrv); - - it.remove(); - } - catch (IgniteCheckedException e) { - if (GridWorker.log.isDebugEnabled()) - GridWorker.log.debug("Failed to bind GGFS endpoint [cfg=" + cfg + ", err=" + e.getMessage() + ']'); - } - } - - if (bindCfgs.isEmpty()) - break; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsStatus.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsStatus.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsStatus.java deleted file mode 100644 index 2300774..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsStatus.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import java.io.*; - -/** - * GGFS response for status request. - */ -public class IgfsStatus implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Total space size. */ - private long spaceTotal; - - /** Used space in GGFS. */ - private long spaceUsed; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public IgfsStatus() { - // No-op. - } - - /** - * @param spaceUsed Used space in GGFS. - * @param spaceTotal Total space available in GGFS. - */ - public IgfsStatus(long spaceUsed, long spaceTotal) { - this.spaceUsed = spaceUsed; - this.spaceTotal = spaceTotal; - } - - /** - * @return Total space available in GGFS. - */ - public long spaceTotal() { - return spaceTotal; - } - - /** - * @return Used space in GGFS. - */ - public long spaceUsed() { - return spaceUsed; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(spaceUsed); - out.writeLong(spaceTotal); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - spaceUsed = in.readLong(); - spaceTotal = in.readLong(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSyncMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSyncMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSyncMessage.java deleted file mode 100644 index 41f4145..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsSyncMessage.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.plugin.extensions.communication.*; - -import java.io.*; -import java.nio.*; - -/** - * Basic sync message. - */ -public class IgfsSyncMessage extends IgfsCommunicationMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Coordinator node order. */ - private long order; - - /** Response flag. */ - private boolean res; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public IgfsSyncMessage() { - // No-op. - } - - /** - * @param order Node order. - * @param res Response flag. - */ - public IgfsSyncMessage(long order, boolean res) { - this.order = order; - this.res = res; - } - - /** - * @return Coordinator node order. - */ - public long order() { - return order; - } - - /** - * @return {@code True} if response message. - */ - public boolean response() { - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsSyncMessage.class, this); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public MessageAdapter clone() { - IgfsSyncMessage _clone = new IgfsSyncMessage(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(MessageAdapter _msg) { - super.clone0(_msg); - - IgfsSyncMessage _clone = (IgfsSyncMessage)_msg; - - _clone.order = order; - _clone.res = res; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean writeTo(ByteBuffer buf) { - writer.setBuffer(buf); - - if (!super.writeTo(buf)) - return false; - - if (!typeWritten) { - if (!writer.writeByte(null, directType())) - return false; - - typeWritten = true; - } - - switch (state) { - case 0: - if (!writer.writeLong("order", order)) - return false; - - state++; - - case 1: - if (!writer.writeBoolean("res", res)) - return false; - - state++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("all") - @Override public boolean readFrom(ByteBuffer buf) { - reader.setBuffer(buf); - - if (!super.readFrom(buf)) - return false; - - switch (state) { - case 0: - order = reader.readLong("order"); - - if (!reader.isLastRead()) - return false; - - state++; - - case 1: - res = reader.readBoolean("res"); - - if (!reader.isLastRead()) - return false; - - state++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 71; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsTaskArgsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsTaskArgsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsTaskArgsImpl.java deleted file mode 100644 index 7139c12..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsTaskArgsImpl.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.mapreduce.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -/** - * GGFS task arguments implementation. - */ -public class IgfsTaskArgsImpl<T> implements IgfsTaskArgs<T>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** GGFS name. */ - private String ggfsName; - - /** Paths. */ - private Collection<IgfsPath> paths; - - /** Record resolver. */ - private IgfsRecordResolver recRslvr; - - /** Skip non existent files flag. */ - private boolean skipNonExistentFiles; - - /** Maximum range length. */ - private long maxRangeLen; - - /** User argument. */ - private T usrArg; - - /** - * {@link Externalizable} support. - */ - public IgfsTaskArgsImpl() { - // No-op. - } - - /** - * Constructor. - * - * @param ggfsName GGFS name. - * @param paths Paths. - * @param recRslvr Record resolver. - * @param skipNonExistentFiles Skip non existent files flag. - * @param maxRangeLen Maximum range length. - * @param usrArg User argument. - */ - public IgfsTaskArgsImpl(String ggfsName, Collection<IgfsPath> paths, IgfsRecordResolver recRslvr, - boolean skipNonExistentFiles, long maxRangeLen, T usrArg) { - this.ggfsName = ggfsName; - this.paths = paths; - this.recRslvr = recRslvr; - this.skipNonExistentFiles = skipNonExistentFiles; - this.maxRangeLen = maxRangeLen; - this.usrArg = usrArg; - } - - /** {@inheritDoc} */ - @Override public String ggfsName() { - return ggfsName; - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> paths() { - return paths; - } - - /** {@inheritDoc} */ - @Override public IgfsRecordResolver recordResolver() { - return recRslvr; - } - - /** {@inheritDoc} */ - @Override public boolean skipNonExistentFiles() { - return skipNonExistentFiles; - } - - /** {@inheritDoc} */ - @Override public long maxRangeLength() { - return maxRangeLen; - } - - /** {@inheritDoc} */ - @Override public T userArgument() { - return usrArg; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsTaskArgsImpl.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, ggfsName); - U.writeCollection(out, paths); - - out.writeObject(recRslvr); - out.writeBoolean(skipNonExistentFiles); - out.writeLong(maxRangeLen); - out.writeObject(usrArg); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - ggfsName = U.readString(in); - paths = U.readCollection(in); - - recRslvr = (IgfsRecordResolver)in.readObject(); - skipNonExistentFiles = in.readBoolean(); - maxRangeLen = in.readLong(); - usrArg = (T)in.readObject(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsThread.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsThread.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsThread.java deleted file mode 100644 index ba1fd6e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/IgfsThread.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.fs; - -import org.apache.ignite.internal.util.typedef.*; - -/** - * GGFS ad-hoc thread. - */ -public abstract class IgfsThread extends Thread { - /** - * Creates {@code GGFS} add-hoc thread. - */ - protected IgfsThread() { - super("ggfs-worker"); - } - - /** - * Creates {@code GGFS} add-hoc thread. - * - * @param name Thread name. - */ - protected IgfsThread(String name) { - super(name); - } - - /** {@inheritDoc} */ - @Override public final void run() { - try { - body(); - } - catch (InterruptedException ignore) { - interrupt(); - } - // Catch all. - catch (Throwable e) { - X.error("Failed to execute GGFS ad-hoc thread: " + e.getMessage()); - - e.printStackTrace(); - } - finally { - try { - cleanup(); - } - // Catch all. - catch (Throwable e) { - X.error("Failed to clean up GGFS ad-hoc thread: " + e.getMessage()); - - e.printStackTrace(); - } - } - } - - /** - * Thread body. - * - * @throws InterruptedException If interrupted. - */ - protected abstract void body() throws InterruptedException; - - /** - * Cleanup. - */ - protected void cleanup() { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/package.html deleted file mode 100644 index 60e49f9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains high performance file system processer. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java new file mode 100644 index 0000000..e1051c6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; + +/** + * Block write request acknowledgement message. + */ +public class IgfsAckMessage extends IgfsCommunicationMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** File id. */ + private IgniteUuid fileId; + + /** Request ID to ack. */ + private long id; + + /** Write exception. */ + @GridDirectTransient + private IgniteCheckedException err; + + /** */ + private byte[] errBytes; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public IgfsAckMessage() { + // No-op. + } + + /** + * @param fileId File ID. + * @param id Request ID. + * @param err Error. + */ + public IgfsAckMessage(IgniteUuid fileId, long id, @Nullable IgniteCheckedException err) { + this.fileId = fileId; + this.id = id; + this.err = err; + } + + /** + * @return File ID. + */ + public IgniteUuid fileId() { + return fileId; + } + + /** + * @return Batch ID. + */ + public long id() { + return id; + } + + /** + * @return Error occurred when writing this batch, if any. + */ + public IgniteCheckedException error() { + return err; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + super.prepareMarshal(marsh); + + if (err != null) + errBytes = marsh.marshal(err); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(marsh, ldr); + + if (errBytes != null) + err = marsh.unmarshal(errBytes, ldr); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public MessageAdapter clone() { + IgfsAckMessage _clone = new IgfsAckMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(MessageAdapter _msg) { + super.clone0(_msg); + + IgfsAckMessage _clone = (IgfsAckMessage)_msg; + + _clone.fileId = fileId; + _clone.id = id; + _clone.err = err; + _clone.errBytes = errBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + writer.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!typeWritten) { + if (!writer.writeByte(null, directType())) + return false; + + typeWritten = true; + } + + switch (state) { + case 0: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + state++; + + case 1: + if (!writer.writeIgniteUuid("fileId", fileId)) + return false; + + state++; + + case 2: + if (!writer.writeLong("id", id)) + return false; + + state++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + reader.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (state) { + case 0: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + state++; + + case 1: + fileId = reader.readIgniteUuid("fileId"); + + if (!reader.isLastRead()) + return false; + + state++; + + case 2: + id = reader.readLong("id"); + + if (!reader.isLastRead()) + return false; + + state++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 64; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java new file mode 100644 index 0000000..da37279 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.igfs.mapreduce.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.net.*; +import java.util.*; + +/** + * Ggfs supporting asynchronous operations. + */ +public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFs> implements IgfsEx { + /** */ + private final IgfsImpl ggfs; + + /** + * @param ggfs Ggfs. + */ + public IgfsAsyncImpl(IgfsImpl ggfs) { + super(true); + + this.ggfs = ggfs; + } + + /** {@inheritDoc} */ + @Override public void format() { + try { + saveOrGet(ggfs.formatAsync()); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, @Nullable T arg) { + try { + return saveOrGet(ggfs.executeAsync(task, rslvr, paths, arg)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, + Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { + try { + return saveOrGet(ggfs.executeAsync(task, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) { + try { + return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, arg)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, + @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, + long maxRangeLen, @Nullable T arg) { + try { + return saveOrGet(ggfs.executeAsync(taskCls, rslvr, paths, skipNonExistentFiles, maxRangeLen, arg)); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override public void stop() { + ggfs.stop(); + } + + /** {@inheritDoc} */ + @Override public IgfsContext context() { + return ggfs.context(); + } + + /** {@inheritDoc} */ + @Override public IgfsPaths proxyPaths() { + return ggfs.proxyPaths(); + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, + int seqReadsBeforePrefetch) { + return ggfs.open(path, bufSize, seqReadsBeforePrefetch); + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path) { + return ggfs.open(path); + } + + /** {@inheritDoc} */ + @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) { + return ggfs.open(path, bufSize); + } + + /** {@inheritDoc} */ + @Override public IgfsStatus globalSpace() throws IgniteCheckedException { + return ggfs.globalSpace(); + } + + /** {@inheritDoc} */ + @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException { + ggfs.globalSampling(val); + } + + /** {@inheritDoc} */ + @Nullable @Override public Boolean globalSampling() { + return ggfs.globalSampling(); + } + + /** {@inheritDoc} */ + @Override public IgfsLocalMetrics localMetrics() { + return ggfs.localMetrics(); + } + + /** {@inheritDoc} */ + @Override public long groupBlockSize() { + return ggfs.groupBlockSize(); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException { + return ggfs.awaitDeletesAsync(); + } + + /** {@inheritDoc} */ + @Nullable @Override public String clientLogDirectory() { + return ggfs.clientLogDirectory(); + } + + /** {@inheritDoc} */ + @Override public void clientLogDirectory(String logDir) { + ggfs.clientLogDirectory(logDir); + } + + /** {@inheritDoc} */ + @Override public boolean evictExclude(IgfsPath path, boolean primary) { + return ggfs.evictExclude(path, primary); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid nextAffinityKey() { + return ggfs.nextAffinityKey(); + } + + /** {@inheritDoc} */ + @Override public boolean isProxy(URI path) { + return ggfs.isProxy(path); + } + + /** {@inheritDoc} */ + @Nullable @Override public String name() { + return ggfs.name(); + } + + /** {@inheritDoc} */ + @Override public IgfsConfiguration configuration() { + return ggfs.configuration(); + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary summary(IgfsPath path) { + return ggfs.summary(path); + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, boolean overwrite) { + return ggfs.create(path, overwrite); + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, + long blockSize, @Nullable Map<String, String> props) { + return ggfs.create(path, bufSize, overwrite, replication, blockSize, props); + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, + @Nullable IgniteUuid affKey, int replication, long blockSize, @Nullable Map<String, String> props) { + return ggfs.create(path, bufSize, overwrite, affKey, replication, blockSize, props); + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream append(IgfsPath path, boolean create) { + return ggfs.append(path, create); + } + + /** {@inheritDoc} */ + @Override public IgfsOutputStream append(IgfsPath path, int bufSize, boolean create, + @Nullable Map<String, String> props) { + return ggfs.append(path, bufSize, create, props); + } + + /** {@inheritDoc} */ + @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) { + ggfs.setTimes(path, accessTime, modificationTime); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) { + return ggfs.affinity(path, start, len); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, long maxLen) { + return ggfs.affinity(path, start, len, maxLen); + } + + /** {@inheritDoc} */ + @Override public IgfsMetrics metrics() { + return ggfs.metrics(); + } + + /** {@inheritDoc} */ + @Override public void resetMetrics() { + ggfs.resetMetrics(); + } + + /** {@inheritDoc} */ + @Override public long size(IgfsPath path) { + return ggfs.size(path); + } + + /** {@inheritDoc} */ + @Override public boolean exists(IgfsPath path) { + return ggfs.exists(path); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { + return ggfs.update(path, props); + } + + /** {@inheritDoc} */ + @Override public void rename(IgfsPath src, IgfsPath dest) { + ggfs.rename(src, dest); + } + + /** {@inheritDoc} */ + @Override public boolean delete(IgfsPath path, boolean recursive) { + return ggfs.delete(path, recursive); + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path) { + ggfs.mkdirs(path); + } + + /** {@inheritDoc} */ + @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { + ggfs.mkdirs(path, props); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) { + return ggfs.listPaths(path); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) { + return ggfs.listFiles(path); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgfsFile info(IgfsPath path) { + return ggfs.info(path); + } + + /** {@inheritDoc} */ + @Override public long usedSpaceSize() { + return ggfs.usedSpaceSize(); + } + + /** {@inheritDoc} */ + @Override public Map<String, String> properties() { + return ggfs.properties(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAttributes.java new file mode 100644 index 0000000..31b696e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAttributes.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * GGFS attributes. + * <p> + * This class contains information on a single GGFS configured on some node. + */ +public class IgfsAttributes implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** GGFS name. */ + private String ggfsName; + + /** File's data block size (bytes). */ + private int blockSize; + + /** Size of the group figured in {@link org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper}. */ + private int grpSize; + + /** Meta cache name. */ + private String metaCacheName; + + /** Data cache name. */ + private String dataCacheName; + + /** Default mode. */ + private IgfsMode dfltMode; + + /** Fragmentizer enabled flag. */ + private boolean fragmentizerEnabled; + + /** Path modes. */ + private Map<String, IgfsMode> pathModes; + + /** + * @param ggfsName GGFS name. + * @param blockSize File's data block size (bytes). + * @param grpSize Size of the group figured in {@link org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper}. + * @param metaCacheName Meta cache name. + * @param dataCacheName Data cache name. + * @param dfltMode Default mode. + * @param pathModes Path modes. + */ + public IgfsAttributes(String ggfsName, int blockSize, int grpSize, String metaCacheName, String dataCacheName, + IgfsMode dfltMode, Map<String, IgfsMode> pathModes, boolean fragmentizerEnabled) { + this.blockSize = blockSize; + this.ggfsName = ggfsName; + this.grpSize = grpSize; + this.metaCacheName = metaCacheName; + this.dataCacheName = dataCacheName; + this.dfltMode = dfltMode; + this.pathModes = pathModes; + this.fragmentizerEnabled = fragmentizerEnabled; + } + + /** + * Public no-arg constructor for {@link Externalizable}. + */ + public IgfsAttributes() { + // No-op. + } + + /** + * @return GGFS name. + */ + public String ggfsName() { + return ggfsName; + } + + /** + * @return File's data block size (bytes). + */ + public int blockSize() { + return blockSize; + } + + /** + * @return Size of the group figured in {@link org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper}. + */ + public int groupSize() { + return grpSize; + } + + /** + * @return Metadata cache name. + */ + public String metaCacheName() { + return metaCacheName; + } + + /** + * @return Data cache name. + */ + public String dataCacheName() { + return dataCacheName; + } + + /** + * @return Default mode. + */ + public IgfsMode defaultMode() { + return dfltMode; + } + + /** + * @return Path modes. + */ + public Map<String, IgfsMode> pathModes() { + return pathModes != null ? Collections.unmodifiableMap(pathModes) : null; + } + + /** + * @return {@code True} if fragmentizer is enabled. + */ + public boolean fragmentizerEnabled() { + return fragmentizerEnabled; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, ggfsName); + out.writeInt(blockSize); + out.writeInt(grpSize); + U.writeString(out, metaCacheName); + U.writeString(out, dataCacheName); + U.writeEnum(out, dfltMode); + out.writeBoolean(fragmentizerEnabled); + + if (pathModes != null) { + out.writeBoolean(true); + + out.writeInt(pathModes.size()); + + for (Map.Entry<String, IgfsMode> pathMode : pathModes.entrySet()) { + U.writeString(out, pathMode.getKey()); + U.writeEnum(out, pathMode.getValue()); + } + } + else + out.writeBoolean(false); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ggfsName = U.readString(in); + blockSize = in.readInt(); + grpSize = in.readInt(); + metaCacheName = U.readString(in); + dataCacheName = U.readString(in); + dfltMode = IgfsMode.fromOrdinal(in.readByte()); + fragmentizerEnabled = in.readBoolean(); + + if (in.readBoolean()) { + int size = in.readInt(); + + pathModes = new HashMap<>(size, 1.0f); + + for (int i = 0; i < size; i++) + pathModes.put(U.readString(in), IgfsMode.fromOrdinal(in.readByte())); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java new file mode 100644 index 0000000..304095c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.internal.processors.task.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; + +/** + * File's binary data block key. + */ +@GridInternal +public final class IgfsBlockKey extends MessageAdapter implements Externalizable, Comparable<IgfsBlockKey> { + /** */ + private static final long serialVersionUID = 0L; + + /** File system file ID. */ + private IgniteUuid fileId; + + /** Block ID. */ + private long blockId; + + /** Block affinity key. */ + private IgniteUuid affKey; + + /** Eviction exclude flag. */ + private boolean evictExclude; + + /** + * Constructs file's binary data block key. + * + * @param fileId File ID. + * @param affKey Affinity key. + * @param evictExclude Evict exclude flag. + * @param blockId Block ID. + */ + public IgfsBlockKey(IgniteUuid fileId, @Nullable IgniteUuid affKey, boolean evictExclude, long blockId) { + assert fileId != null; + assert blockId >= 0; + + this.fileId = fileId; + this.affKey = affKey; + this.evictExclude = evictExclude; + this.blockId = blockId; + } + + /** + * Empty constructor required for {@link Externalizable}. + */ + public IgfsBlockKey() { + // No-op. + } + + /** + * @return File ID. + */ + public IgniteUuid getFileId() { + return fileId; + } + + /** + * @return Block affinity key. + */ + public IgniteUuid affinityKey() { + return affKey; + } + + /** + * @return Evict exclude flag. + */ + public boolean evictExclude() { + return evictExclude; + } + + /** + * @return Block ID. + */ + public long getBlockId() { + return blockId; + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull IgfsBlockKey o) { + int res = fileId.compareTo(o.fileId); + + if (res != 0) + return res; + + long v1 = blockId; + long v2 = o.blockId; + + if (v1 != v2) + return v1 > v2 ? 1 : -1; + + if (affKey == null && o.affKey == null) + return 0; + + if (affKey != null && o.affKey != null) + return affKey.compareTo(o.affKey); + + return affKey != null ? -1 : 1; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeGridUuid(out, fileId); + U.writeGridUuid(out, affKey); + out.writeBoolean(evictExclude); + out.writeLong(blockId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException { + fileId = U.readGridUuid(in); + affKey = U.readGridUuid(in); + evictExclude = in.readBoolean(); + blockId = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return fileId.hashCode() + (int)(blockId ^ (blockId >>> 32)); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || o.getClass() != getClass()) + return false; + + IgfsBlockKey that = (IgfsBlockKey)o; + + return blockId == that.blockId && fileId.equals(that.fileId) && F.eq(affKey, that.affKey) && + evictExclude == that.evictExclude; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public MessageAdapter clone() { + IgfsBlockKey _clone = new IgfsBlockKey(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(MessageAdapter _msg) { + IgfsBlockKey _clone = (IgfsBlockKey)_msg; + + _clone.fileId = fileId; + _clone.blockId = blockId; + _clone.affKey = affKey; + _clone.evictExclude = evictExclude; + } + + /** {@inheritDoc} */ + @SuppressWarnings("fallthrough") + @Override public boolean writeTo(ByteBuffer buf) { + writer.setBuffer(buf); + + if (!typeWritten) { + if (!writer.writeByte(null, directType())) + return false; + + typeWritten = true; + } + + switch (state) { + case 0: + if (!writer.writeIgniteUuid("affKey", affKey)) + return false; + + state++; + + case 1: + if (!writer.writeLong("blockId", blockId)) + return false; + + state++; + + case 2: + if (!writer.writeBoolean("evictExclude", evictExclude)) + return false; + + state++; + + case 3: + if (!writer.writeIgniteUuid("fileId", fileId)) + return false; + + state++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("fallthrough") + @Override public boolean readFrom(ByteBuffer buf) { + reader.setBuffer(buf); + + switch (state) { + case 0: + affKey = reader.readIgniteUuid("affKey"); + + if (!reader.isLastRead()) + return false; + + state++; + + case 1: + blockId = reader.readLong("blockId"); + + if (!reader.isLastRead()) + return false; + + state++; + + case 2: + evictExclude = reader.readBoolean("evictExclude"); + + if (!reader.isLastRead()) + return false; + + state++; + + case 3: + fileId = reader.readIgniteUuid("fileId"); + + if (!reader.isLastRead()) + return false; + + state++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 65; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsBlockKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java new file mode 100644 index 0000000..bbea11b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * File block location in the grid. + */ +public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long start; + + /** */ + private long len; + + /** */ + @GridToStringInclude + private Collection<UUID> nodeIds; + + /** */ + private Collection<String> names; + + /** */ + private Collection<String> hosts; + + /** + * Empty constructor for externalizable. + */ + public IgfsBlockLocationImpl() { + // No-op. + } + + /** + * @param location HDFS block location. + * @param len New length. + */ + public IgfsBlockLocationImpl(IgfsBlockLocation location, long len) { + assert location != null; + + start = location.start(); + this.len = len; + + nodeIds = location.nodeIds(); + names = location.names(); + hosts = location.hosts(); + } + + /** + * @param start Start. + * @param len Length. + * @param nodes Affinity nodes. + */ + public IgfsBlockLocationImpl(long start, long len, Collection<ClusterNode> nodes) { + assert start >= 0; + assert len > 0; + assert nodes != null && !nodes.isEmpty(); + + this.start = start; + this.len = len; + + convertFromNodes(nodes); + } + + /** + * @return Start position. + */ + @Override public long start() { + return start; + } + + /** + * @return Length. + */ + @Override public long length() { + return len; + } + + /** + * @return Node IDs. + */ + @Override public Collection<UUID> nodeIds() { + return nodeIds; + } + + /** {@inheritDoc} */ + @Override public Collection<String> names() { + return names; + } + + /** {@inheritDoc} */ + @Override public Collection<String> hosts() { + return hosts; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = (int)(start ^ (start >>> 32)); + + res = 31 * res + (int)(len ^ (len >>> 32)); + res = 31 * res + nodeIds.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == this) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + IgfsBlockLocationImpl that = (IgfsBlockLocationImpl)o; + + return len == that.len && start == that.start && F.eq(nodeIds, that.nodeIds) && F.eq(names, that.names) && + F.eq(hosts, that.hosts); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsBlockLocationImpl.class, this); + } + + /** + * Writes this object to data output. Note that this is not externalizable + * interface because we want to eliminate any marshaller. + * + * @param out Data output to write. + * @throws IOException If write failed. + */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + assert names != null; + assert hosts != null; + + out.writeLong(start); + out.writeLong(len); + + out.writeBoolean(nodeIds != null); + + if (nodeIds != null) { + out.writeInt(nodeIds.size()); + + for (UUID nodeId : nodeIds) + U.writeUuid(out, nodeId); + } + + out.writeInt(names.size()); + + for (String name : names) + out.writeUTF(name); + + out.writeInt(hosts.size()); + + for (String host : hosts) + out.writeUTF(host); + } + + /** + * Reads object from data input. Note we do not use externalizable interface + * to eliminate marshaller. + * + * @param in Data input. + * @throws IOException If read failed. + */ + @Override public void readExternal(ObjectInput in) throws IOException { + start = in.readLong(); + len = in.readLong(); + + int size; + + if (in.readBoolean()) { + size = in.readInt(); + + nodeIds = new ArrayList<>(size); + + for (int i = 0; i < size; i++) + nodeIds.add(U.readUuid(in)); + } + + size = in.readInt(); + + names = new ArrayList<>(size); + + for (int i = 0; i < size; i++) + names.add(in.readUTF()); + + size = in.readInt(); + + hosts = new ArrayList<>(size); + + for (int i = 0; i < size; i++) + hosts.add(in.readUTF()); + } + + /** + * Converts collection of rich nodes to block location data. + * + * @param nodes Collection of affinity nodes. + */ + private void convertFromNodes(Collection<ClusterNode> nodes) { + Collection<String> names = new LinkedHashSet<>(); + Collection<String> hosts = new LinkedHashSet<>(); + Collection<UUID> nodeIds = new ArrayList<>(nodes.size()); + + for (final ClusterNode node : nodes) { + // Normalize host names into Hadoop-expected format. + try { + Collection<InetAddress> addrs = U.toInetAddresses(node); + + for (InetAddress addr : addrs) { + if (addr.getHostName() == null) + names.add(addr.getHostAddress() + ":" + 9001); + else { + names.add(addr.getHostName() + ":" + 9001); // hostname:portNumber + hosts.add(addr.getHostName()); + } + } + } + catch (IgniteCheckedException ignored) { + names.addAll(node.addresses()); + } + + nodeIds.add(node.id()); + } + + this.nodeIds = nodeIds; + this.names = names; + this.hosts = hosts; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java new file mode 100644 index 0000000..90d244a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlocksMessage.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * GGFS write blocks message. + */ +public class IgfsBlocksMessage extends IgfsCommunicationMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** File id. */ + private IgniteUuid fileId; + + /** Batch id */ + private long id; + + /** Blocks to store. */ + @GridDirectMap(keyType = IgfsBlockKey.class, valueType = byte[].class) + private Map<IgfsBlockKey, byte[]> blocks; + + /** + * Empty constructor required by {@link Externalizable} + */ + public IgfsBlocksMessage() { + // No-op. + } + + /** + * Constructor. + * + * @param fileId File ID. + * @param id Message id. + * @param blocks Blocks to put in cache. + */ + public IgfsBlocksMessage(IgniteUuid fileId, long id, Map<IgfsBlockKey, byte[]> blocks) { + this.fileId = fileId; + this.id = id; + this.blocks = blocks; + } + + /** + * @return File id. + */ + public IgniteUuid fileId() { + return fileId; + } + + /** + * @return Batch id. + */ + public long id() { + return id; + } + + /** + * @return Map of blocks to put in cache. + */ + public Map<IgfsBlockKey, byte[]> blocks() { + return blocks; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public MessageAdapter clone() { + IgfsBlocksMessage _clone = new IgfsBlocksMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(MessageAdapter _msg) { + super.clone0(_msg); + + IgfsBlocksMessage _clone = (IgfsBlocksMessage)_msg; + + _clone.fileId = fileId; + _clone.id = id; + _clone.blocks = blocks; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + writer.setBuffer(buf); + + if (!super.writeTo(buf)) + return false; + + if (!typeWritten) { + if (!writer.writeByte(null, directType())) + return false; + + typeWritten = true; + } + + switch (state) { + case 0: + if (!writer.writeMap("blocks", blocks, IgfsBlockKey.class, byte[].class)) + return false; + + state++; + + case 1: + if (!writer.writeIgniteUuid("fileId", fileId)) + return false; + + state++; + + case 2: + if (!writer.writeLong("id", id)) + return false; + + state++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + reader.setBuffer(buf); + + if (!super.readFrom(buf)) + return false; + + switch (state) { + case 0: + blocks = reader.readMap("blocks", IgfsBlockKey.class, byte[].class, false); + + if (!reader.isLastRead()) + return false; + + state++; + + case 1: + fileId = reader.readIgniteUuid("fileId"); + + if (!reader.isLastRead()) + return false; + + state++; + + case 2: + id = reader.readLong("id"); + + if (!reader.isLastRead()) + return false; + + state++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 66; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/141f8282/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsClientSession.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsClientSession.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsClientSession.java new file mode 100644 index 0000000..5da444d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsClientSession.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs; + +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * GGFS client session. Effectively used to manage lifecycle of opened resources and close them on + * connection close. + */ +public class IgfsClientSession { + /** Session resources. */ + private ConcurrentMap<Long, Closeable> rsrcMap = new ConcurrentHashMap8<>(); + + /** + * Registers resource within this session. + * + * @param rsrcId Resource id. + * @param rsrc Resource to register. + */ + public boolean registerResource(long rsrcId, Closeable rsrc) { + Object old = rsrcMap.putIfAbsent(rsrcId, rsrc); + + return old == null; + } + + /** + * Gets registered resource by ID. + * + * @param rsrcId Resource ID. + * @return Resource or {@code null} if resource was not found. + */ + @Nullable public <T> T resource(Long rsrcId) { + return (T)rsrcMap.get(rsrcId); + } + + /** + * Unregister previously registered resource. + * + * @param rsrcId Resource ID. + * @param rsrc Resource to unregister. + * @return {@code True} if resource was unregistered, {@code false} if no resource + * is associated with this ID or other resource is associated with this ID. + */ + public boolean unregisterResource(Long rsrcId, Closeable rsrc) { + return rsrcMap.remove(rsrcId, rsrc); + } + + /** + * @return Registered resources iterator. + */ + public Iterator<Closeable> registeredResources() { + return rsrcMap.values().iterator(); + } +}
