Author: rajith
Date: Tue Aug 7 11:02:07 2012
New Revision: 1370172
URL: http://svn.apache.org/viewvc?rev=1370172&view=rev
Log:
Fixed minor errors.
Added:
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/Post.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/SystemOutLogger.java
Modified:
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslClientImpl.java
Modified:
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1370172&r1=1370171&r2=1370172&view=diff
==============================================================================
---
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java
(original)
+++
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/DriverImpl.java
Tue Aug 7 11:02:07 2012
@@ -204,8 +204,7 @@ public class DriverImpl implements Drive
{
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
- Socket socket = channel.socket();
- socket.bind(new InetSocketAddress(host, port));
+ channel.connect(new InetSocketAddress(host, port));
return createConnector(channel, context);
}
catch (IOException e)
Modified:
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java?rev=1370172&r1=1370171&r2=1370172&view=diff
==============================================================================
---
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java
(original)
+++
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/MailServer.java
Tue Aug 7 11:02:07 2012
@@ -53,7 +53,7 @@ public class MailServer
public MailServer() throws Exception
{
- _logger = new Logger();
+ _logger = new SystemOutLogger("TestServer: ");
_driver = new DriverImpl(_logger);
_listener = _driver.createListener("localhost", 5672, State.NEW);
}
@@ -135,7 +135,7 @@ public class MailServer
{
//sasl.(sasl, PN_SASL_AUTH)
}
- }
+ }
state = sasl.getState();
}
@@ -366,87 +366,5 @@ public class MailServer
server.acceptConnections();
server.processConnections();
}
- }
-
- class Logger implements LogHandler
- {
- static final String prefix = "TestServer: ";
-
- @Override
- public boolean isTraceEnabled()
- {
- return true;
- }
-
- @Override
- public void trace(String message)
- {
- System.out.println(prefix + message);
- }
-
- @Override
- public boolean isDebugEnabled()
- {
- return true;
- }
-
- @Override
- public void debug(String message)
- {
- System.out.println(prefix + "DEBUG : " + message);
- }
-
- @Override
- public void debug(Throwable t, String message)
- {
- System.out.println(prefix + "DEBUG : " + message);
- t.printStackTrace();
- }
-
- @Override
- public boolean isInfoEnabled()
- {
- return true;
- }
-
- @Override
- public void info(String message)
- {
- System.out.println(prefix + "INFO : " + message);
- }
-
- @Override
- public void info(Throwable t, String message)
- {
- System.out.println(prefix + "INFO : " + message);
- t.printStackTrace();
- }
-
- @Override
- public void warn(String message)
- {
- System.out.println(prefix + "WARN : " + message);
- }
-
- @Override
- public void warn(Throwable t, String message)
- {
- System.out.println(prefix + "INFO : " + message);
- t.printStackTrace();
- }
-
- @Override
- public void error(String message)
- {
- System.out.println(prefix + "ERROR : " + message);
- }
-
- @Override
- public void error(Throwable t, String message)
- {
- System.out.println(prefix + "INFO : " + message);
- t.printStackTrace();
- }
-
- }
+ }
}
Added:
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/Post.java
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/Post.java?rev=1370172&view=auto
==============================================================================
---
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/Post.java
(added)
+++
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/Post.java
Tue Aug 7 11:02:07 2012
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.proton.driver.impl;
+
+import java.util.UUID;
+
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.driver.Driver;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sasl.SaslState;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.logging.LogHandler;
+
+public class Post
+{
+ enum State {NEW,AUTHENTICATING,CONNECTION_UP, FAILED};
+
+ private LogHandler _logger;
+ private Driver _driver;
+ private Connector<State> _ctor;
+ private Connection _conn;
+ private Session _ssn;
+ private Sender _sender;
+ private String _mailbox;
+ private String _msg;
+
+ public Post(String mailbox, String msg) throws Exception
+ {
+ _mailbox = mailbox;
+ _msg = msg;
+ _logger = new SystemOutLogger("TestServer: ");
+ _driver = new DriverImpl(_logger);
+
+ // setup a driver connection to the server
+ _ctor = _driver.createConnector("localhost", 5672, State.NEW);
+
+ // configure SASL
+ _ctor.sasl().setMechanisms(new String[]{"ANONYMOUS"});
+
+ // inform the engine about the connection, and link the driver to it.
+ _conn = new ConnectionImpl();
+ ((ConnectionImpl)_conn).setLocalContainerId("Post");
+ _ctor.setConnection(_conn);
+
+ // create a session, and Link for receiving from the mailbox
+ _logger.info("Posting to mailbox " + mailbox);
+ _ssn = _conn.session();
+ _sender = _ssn.sender("sender");
+ _sender.setLocalTargetAddress(mailbox);
+
+ // now open all the engine endpoints
+ _conn.open();
+ _ssn.open();
+ _sender.open();
+ }
+
+ private void send() throws Exception
+ {
+ while (_sender.getCredit() == 0)
+ {
+ await();
+ }
+
+ _logger.info("Sending " + _msg + " to mailbox : " + _mailbox);
+
+ String id = "post-delivery-" + UUID.randomUUID() ;
+ _sender.delivery(id.getBytes(),0,id.getBytes().length);
+ byte[] data = _msg.getBytes();
+ _sender.send(data, 0, data.length);
+ _sender.advance();
+
+ await(); // wait for server to respond.
+
+ //Delivery delivery = _sender.unsettled().next();
+ Delivery delivery = _conn.getWorkHead();
+ while (delivery == null)
+ {
+ await();
+ delivery = _conn.getWorkHead();
+ }
+
+ if (delivery.getRemoteState() != null)
+ {
+ delivery.settle();
+ _logger.info("Server has accepted msg");
+ }
+ else
+ {
+ _logger.info("Server has not accepted the msg!");
+ }
+ }
+
+ private void close() throws Exception
+ {
+ _conn.close();
+ while (_conn.getRemoteState() != EndpointState.CLOSED)
+ {
+ await();
+ }
+ _logger.info("Connection has been closed");
+ }
+
+ private void await() throws Exception
+ {
+ _logger.debug("Waiting for events...");
+
+ // prepare pending outbound data for the network
+ _ctor.process();
+
+ // wait forever for network event(s)
+ _driver.doWait(0);
+
+ // process any data that arrived
+ _ctor.process();
+
+ _logger.debug("...waiting done!");
+ }
+
+ private boolean authenticate() throws Exception
+ {
+ Sasl sasl = _ctor.sasl();
+ while (sasl.getState() != SaslState.PN_SASL_PASS &&
sasl.getState() != SaslState.PN_SASL_FAIL)
+ {
+ await();
+ }
+
+ return sasl.getState() == SaslState.PN_SASL_PASS;
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length == 0)
+ {
+ System.out.println("You need to specify the mailbox and msg
content.");
+ }
+ else if (args.length == 1)
+ {
+ System.out.println("You need to specify msg content.");
+ }
+
+ Post post = new Post(args[0],args[1]);
+
+ if (post.authenticate())
+ {
+ System.out.println("Authentication sucessful");
+ }
+ else
+ {
+ System.out.println("Error: Authentication failure");
+ return;
+ }
+
+ post.send();
+
+ post.close();
+ }
+
+}
Modified:
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java?rev=1370172&r1=1370171&r2=1370172&view=diff
==============================================================================
---
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java
(original)
+++
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/ServerConnectorImpl.java
Tue Aug 7 11:02:07 2012
@@ -27,6 +27,7 @@ import org.apache.qpid.proton.driver.Con
import org.apache.qpid.proton.driver.Listener;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sasl.SaslState;
import org.apache.qpid.proton.logging.LogHandler;
class ServerConnectorImpl<C> implements Connector<C>
@@ -70,6 +71,18 @@ class ServerConnectorImpl<C> implements
public void process()
{
+ if (_channel.isConnectionPending())
+ {
+ try
+ {
+ _channel.finishConnect();
+ }
+ catch (IOException io)
+ {
+ throw new RuntimeException("Exception will trying to complete
connection",io);
+ }
+ }
+
if (!_channel.isOpen())
{
_state = ConnectorState.CLOSED;
@@ -151,13 +164,18 @@ class ServerConnectorImpl<C> implements
int read = 0;
while (read < size)
{
+ offset = read;
switch (_state)
{
case UNINITIALIZED:
- read += readSasl(bytes, offset, size);
+ read += readSasl(bytes, offset, size - offset);
+ if (isSaslDone())
+ {
+ _state = _sasl.getState() == SaslState.PN_SASL_PASS ?
ConnectorState.OPENED : ConnectorState.CLOSED;
+ }
break;
case OPENED:
- read += readAMQPCommands(bytes, offset, size);
+ read += readAMQPCommands(bytes, offset, size - offset);
break;
case EOS:
case CLOSED:
@@ -173,6 +191,10 @@ class ServerConnectorImpl<C> implements
{
case UNINITIALIZED:
writeSasl();
+ if (isSaslDone())
+ {
+ _state = _sasl.getState() == SaslState.PN_SASL_PASS ?
ConnectorState.OPENED : ConnectorState.CLOSED;
+ }
break;
case OPENED:
writeAMQPCommands();
@@ -243,13 +265,13 @@ class ServerConnectorImpl<C> implements
public void setConnection(Connection connection)
{
// write any remaining data on to the wire.
- writeSasl();
+ //writeSasl();
_connection = connection;
// write initial data
- int size = _writeBuffer.array().length - _bytesNotWritten;
- _bytesNotWritten +=
_connection.transport().output(_writeBuffer.array(),
- _bytesNotWritten, size);
- setState(ConnectorState.OPENED);
+ //int size = _writeBuffer.array().length - _bytesNotWritten;
+ //_bytesNotWritten +=
_connection.transport().output(_writeBuffer.array(),
+ // _bytesNotWritten, size);
+ //setState(ConnectorState.OPENED);
}
public C getContext()
@@ -298,4 +320,10 @@ class ServerConnectorImpl<C> implements
{
_state = newState;
}
+
+ private boolean isSaslDone()
+ {
+ SaslState state = _sasl.getState();
+ return state == SaslState.PN_SASL_PASS || state ==
SaslState.PN_SASL_FAIL;
+ }
}
Added:
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/SystemOutLogger.java
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/SystemOutLogger.java?rev=1370172&view=auto
==============================================================================
---
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/SystemOutLogger.java
(added)
+++
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/driver/impl/SystemOutLogger.java
Tue Aug 7 11:02:07 2012
@@ -0,0 +1,89 @@
+package org.apache.qpid.proton.driver.impl;
+
+import org.apache.qpid.proton.logging.LogHandler;
+
+public class SystemOutLogger implements LogHandler
+{
+ final String _prefix;
+
+ public SystemOutLogger(String prefix)
+ {
+ _prefix = prefix;
+ }
+
+ @Override
+ public boolean isTraceEnabled()
+ {
+ return true;
+ }
+
+ @Override
+ public void trace(String message)
+ {
+ System.out.println(_prefix + message);
+ }
+
+ @Override
+ public boolean isDebugEnabled()
+ {
+ return true;
+ }
+
+ @Override
+ public void debug(String message)
+ {
+ System.out.println(_prefix + "DEBUG : " + message);
+ }
+
+ @Override
+ public void debug(Throwable t, String message)
+ {
+ System.out.println(_prefix + "DEBUG : " + message);
+ t.printStackTrace();
+ }
+
+ @Override
+ public boolean isInfoEnabled()
+ {
+ return true;
+ }
+
+ @Override
+ public void info(String message)
+ {
+ System.out.println(_prefix + "INFO : " + message);
+ }
+
+ @Override
+ public void info(Throwable t, String message)
+ {
+ System.out.println(_prefix + "INFO : " + message);
+ t.printStackTrace();
+ }
+
+ @Override
+ public void warn(String message)
+ {
+ System.out.println(_prefix + "WARN : " + message);
+ }
+
+ @Override
+ public void warn(Throwable t, String message)
+ {
+ System.out.println(_prefix + "INFO : " + message);
+ t.printStackTrace();
+ }
+
+ @Override
+ public void error(String message)
+ {
+ System.out.println(_prefix + "ERROR : " + message);
+ }
+
+ @Override
+ public void error(Throwable t, String message)
+ {
+ System.out.println(_prefix + "INFO : " + message);
+ t.printStackTrace();
+ }
+}
Modified:
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslClientImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslClientImpl.java?rev=1370172&r1=1370171&r2=1370172&view=diff
==============================================================================
---
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslClientImpl.java
(original)
+++
qpid/proton/branches/rajith_sandbox/proton-j/src/org/apache/qpid/proton/engine/impl/SaslClientImpl.java
Tue Aug 7 11:02:07 2012
@@ -2,6 +2,8 @@ package org.apache.qpid.proton.engine.im
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.SaslClient;
+import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
+import org.apache.qpid.proton.engine.Sasl.SaslState;
import org.apache.qpid.proton.type.Binary;
import org.apache.qpid.proton.type.Symbol;
import org.apache.qpid.proton.type.security.*;
@@ -79,6 +81,7 @@ public class SaslClientImpl extends Sasl
break;
}
}
+ _state = _outcome == SaslOutcome.PN_SASL_OK ? SaslState.PN_SASL_PASS :
SaslState.PN_SASL_FAIL;
_done = true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]